diff --git a/src/agents/agent-command.live-model-switch.test.ts b/src/agents/agent-command.live-model-switch.test.ts index 582388feac6..467e82dfcd4 100644 --- a/src/agents/agent-command.live-model-switch.test.ts +++ b/src/agents/agent-command.live-model-switch.test.ts @@ -45,6 +45,8 @@ const state = vi.hoisted(() => ({ resolvedSkills: [], version: 0, })), + prepareInternalSessionEffectsTranscriptMock: vi.fn(), + removeInternalSessionEffectsTranscriptMock: vi.fn(), authProfileStoreMock: { profiles: {} } as { profiles: Record }, sessionEntryMock: undefined as unknown, sessionStoreMock: undefined as unknown, @@ -211,6 +213,13 @@ vi.mock("../config/sessions/transcript-resolve.runtime.js", () => ({ }), })); +vi.mock("./internal-session-effects.js", () => ({ + prepareInternalSessionEffectsTranscript: (...args: unknown[]) => + state.prepareInternalSessionEffectsTranscriptMock(...args), + removeInternalSessionEffectsTranscript: (...args: unknown[]) => + state.removeInternalSessionEffectsTranscriptMock(...args), +})); + vi.mock("../infra/agent-events.js", () => ({ clearAgentRunContext: (...args: unknown[]) => state.clearAgentRunContextMock(...args), emitAgentEvent: (...args: unknown[]) => state.emitAgentEventMock(...args), @@ -808,6 +817,10 @@ describe("agentCommand – LiveSessionModelSwitchError retry", () => { state.deliverAgentCommandResultMock.mockResolvedValue(undefined); state.updateSessionStoreAfterAgentRunMock.mockResolvedValue(undefined); state.trajectoryFlushMock.mockResolvedValue(undefined); + state.prepareInternalSessionEffectsTranscriptMock.mockResolvedValue( + "/tmp/openclaw-internal-run.jsonl", + ); + state.removeInternalSessionEffectsTranscriptMock.mockResolvedValue(undefined); }); afterEach(() => { @@ -900,6 +913,46 @@ describe("agentCommand – LiveSessionModelSwitchError retry", () => { expect(stored?.pendingFinalDeliveryIntentId).toBeUndefined(); }); + it("keeps internal session-effect CLI runs out of visible session state", async () => { + setupSingleAttemptFallback(); + const visibleEntry: SessionEntry = { + sessionId: "session-1", + updatedAt: 1, + sessionFile: "/tmp/session.jsonl", + providerOverride: "anthropic", + modelOverride: "claude", + modelOverrideSource: "user", + skillsSnapshot: { prompt: "visible", skills: [{ name: "existing" }], version: 1 }, + }; + const sessionStore: Record = { "agent:main:main": visibleEntry }; + state.sessionEntryMock = visibleEntry; + state.sessionStoreMock = sessionStore; + state.storePathMock = "/tmp/openclaw-session-store.json"; + const attemptCalls: Array<{ sessionFile?: string; sessionEntry?: SessionEntry }> = []; + state.runAgentAttemptMock.mockImplementation(async (params) => { + attemptCalls.push(params as { sessionFile?: string; sessionEntry?: SessionEntry }); + return makeSuccessResult("openai", "gpt-5.4"); + }); + + await agentCommand({ + message: "internal resume", + to: "+1234567890", + sessionEffects: "internal", + suppressPromptPersistence: true, + }); + + expect(state.prepareInternalSessionEffectsTranscriptMock).toHaveBeenCalledWith({ + sessionFile: "/tmp/session.jsonl", + runId: expect.any(String), + }); + expect(attemptCalls).toHaveLength(1); + expect(attemptCalls[0]?.sessionFile).toBe("/tmp/openclaw-internal-run.jsonl"); + expect(attemptCalls[0]?.sessionEntry).toBe(visibleEntry); + expect(state.persistSessionEntryMock).not.toHaveBeenCalled(); + expect(state.updateSessionStoreAfterAgentRunMock).not.toHaveBeenCalled(); + expect(sessionStore["agent:main:main"]).toBe(visibleEntry); + }); + it("does not duplicate finishing lifecycle when an attempt already emitted finishing", async () => { setupModelSwitchRetry({ provider: "openai", diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index 6666f667c0d..5ee1658ebfc 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -74,6 +74,7 @@ import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js"; import { resolveFastModeState } from "./fast-mode.js"; import { ensureSelectedAgentHarnessPlugin } from "./harness/runtime-plugin.js"; import { resolveAvailableAgentHarnessPolicy } from "./harness/selection.js"; +import { prepareInternalSessionEffectsTranscript } from "./internal-session-effects.js"; import { AGENT_LANE_SUBAGENT } from "./lanes.js"; import { LiveSessionModelSwitchError } from "./live-model-switch.js"; import { loadManifestModelCatalog } from "./model-catalog.js"; @@ -529,6 +530,7 @@ async function agentCommandInternal( ) { const resolvedDeps = await resolveAgentCommandDeps(deps); const isRawModelRun = opts.modelRun === true || opts.promptMode === "none"; + const suppressVisibleSessionEffects = opts.sessionEffects === "internal"; const prepared = await prepareAgentCommandExecution(opts, runtime); const { body, @@ -580,9 +582,14 @@ async function agentCommandInternal( if (!isRawModelRun && acpResolution?.kind === "ready" && sessionKey) { const attemptExecutionRuntime = await loadAttemptExecutionRuntime(); const startedAt = Date.now(); - registerAgentRunContext(runId, { - sessionKey, - }); + registerAgentRunContext( + runId, + suppressVisibleSessionEffects + ? { isControlUiVisible: false } + : { + sessionKey, + }, + ); attemptExecutionRuntime.emitAcpLifecycleStart({ runId, startedAt }); const visibleTextAccumulator = attemptExecutionRuntime.createAcpVisibleTextAccumulator(); @@ -676,21 +683,53 @@ async function agentCommandInternal( const finalTextRaw = visibleTextAccumulator.finalizeRaw(); const finalText = visibleTextAccumulator.finalize(); try { - const { resolveAcpSessionCwd } = await loadAcpSessionIdentifiersRuntime(); + const [{ resolveAcpSessionCwd }, { resolveSessionTranscriptFile }] = await Promise.all([ + loadAcpSessionIdentifiersRuntime(), + loadTranscriptResolveRuntime(), + ]); + const internalSource = suppressVisibleSessionEffects + ? await resolveSessionTranscriptFile({ + sessionId, + sessionKey, + sessionEntry, + agentId: sessionAgentId, + threadId: opts.threadId, + }) + : undefined; + const internalSessionFile = suppressVisibleSessionEffects + ? await prepareInternalSessionEffectsTranscript({ + sessionFile: internalSource?.sessionFile, + runId, + }) + : undefined; + const transcriptSessionEntry: SessionEntry | undefined = internalSessionFile + ? { + ...(sessionEntry ?? { + sessionId, + updatedAt: Date.now(), + sessionStartedAt: Date.now(), + }), + sessionId, + sessionFile: internalSessionFile, + } + : sessionEntry; sessionEntry = await attemptExecutionRuntime.persistAcpTurnTranscript({ body, transcriptBody, finalText: finalTextRaw, sessionId, sessionKey, - sessionEntry, - sessionStore, - storePath, + sessionEntry: transcriptSessionEntry, + sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore, + storePath: suppressVisibleSessionEffects ? undefined : storePath, sessionAgentId, threadId: opts.threadId, sessionCwd: resolveAcpSessionCwd(acpResolution.meta) ?? workspaceDir, config: cfg, }); + if (internalSessionFile) { + sessionEntry = prepared.sessionEntry; + } } catch (error) { log.warn( `ACP transcript persistence failed for ${sessionKey}: ${formatErrorMessage(error)}`, @@ -722,10 +761,11 @@ async function agentCommandInternal( const resolvedVerboseLevel = verboseOverride ?? persistedVerbose ?? (agentCfg?.verboseDefault as VerboseLevel | undefined); - if (sessionKey) { + if (sessionKey || suppressVisibleSessionEffects) { registerAgentRunContext(runId, { - sessionKey, + ...(sessionKey && !suppressVisibleSessionEffects ? { sessionKey } : {}), verboseLevel: resolvedVerboseLevel, + isControlUiVisible: !suppressVisibleSessionEffects, }); } @@ -772,7 +812,13 @@ async function agentCommandInternal( ? undefined : await hydrateResolvedSkillsAsync(currentSkillsSnapshot, buildSkillsSnapshot); - if (skillsSnapshot && sessionStore && sessionKey && needsSkillsSnapshot) { + if ( + skillsSnapshot && + sessionStore && + sessionKey && + needsSkillsSnapshot && + !suppressVisibleSessionEffects + ) { const now = Date.now(); const current = sessionEntry ?? { sessionId, @@ -796,7 +842,7 @@ async function agentCommandInternal( } // Persist explicit /command overrides to the session store when we have a key. - if (sessionStore && sessionKey) { + if (sessionStore && sessionKey && !suppressVisibleSessionEffects) { const now = Date.now(); const entry = sessionStore[sessionKey] ?? sessionEntry ?? { sessionId, updatedAt: now, sessionStartedAt: now }; @@ -877,7 +923,13 @@ async function agentCommandInternal( allowedModelCatalog = visibilityPolicy.allowedCatalog; } - if (sessionEntry && sessionStore && sessionKey && hasStoredOverride) { + if ( + sessionEntry && + sessionStore && + sessionKey && + hasStoredOverride && + !suppressVisibleSessionEffects + ) { const entry = sessionEntry; const repaired = repairProviderWrappedModelOverride({ entry, @@ -1029,7 +1081,7 @@ async function agentCommandInternal( authProfileOverrideSource: undefined, authProfileOverrideCompactionCount: undefined, }; - } else if (sessionStore && sessionKey) { + } else if (sessionStore && sessionKey && !suppressVisibleSessionEffects) { await clearSessionAuthProfileOverride({ sessionEntry: entry, sessionStore, @@ -1083,7 +1135,8 @@ async function agentCommandInternal( sessionEntry && sessionStore && sessionKey && - sessionEntry.thinkingLevel === previousThinkLevel + sessionEntry.thinkingLevel === previousThinkLevel && + !suppressVisibleSessionEffects ) { const entry = sessionEntry; entry.thinkingLevel = fallbackThinkLevel; @@ -1103,8 +1156,8 @@ async function agentCommandInternal( const resolvedSessionFile = await resolveSessionTranscriptFile({ sessionId, sessionKey, - sessionStore, - storePath, + sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore, + storePath: suppressVisibleSessionEffects ? undefined : storePath, sessionEntry, agentId: sessionAgentId, threadId: opts.threadId, @@ -1124,6 +1177,9 @@ async function agentCommandInternal( sessionFile = resolvedSessionFile.sessionFile; sessionEntry = resolvedSessionFile.sessionEntry; } + const attemptSessionFile = suppressVisibleSessionEffects + ? await prepareInternalSessionEffectsTranscript({ sessionFile, runId }) + : sessionFile; const startedAt = Date.now(); const attemptLifecycleState = { @@ -1295,7 +1351,7 @@ async function agentCommandInternal( sessionId, sessionKey, sessionAgentId, - sessionFile, + sessionFile: attemptSessionFile, workspaceDir, body, isFallbackRetry, @@ -1317,11 +1373,12 @@ async function agentCommandInternal( resolvedVerboseLevel, agentDir, authProfileProvider: providerForAuthProfileValidation, - sessionStore, - storePath, + sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore, + storePath: suppressVisibleSessionEffects ? undefined : storePath, allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, sessionHasHistory: - !isNewSession || (await attemptExecutionRuntime.sessionFileHasContent(sessionFile)), + !isNewSession || + (await attemptExecutionRuntime.sessionFileHasContent(attemptSessionFile)), suppressPromptPersistenceOnRetry: opts.suppressPromptPersistence === true || (isFallbackRetry && attemptLifecycleState.currentTurnUserMessagePersisted), @@ -1340,6 +1397,7 @@ async function agentCommandInternal( sessionEntry && sessionStore && sessionKey && + !suppressVisibleSessionEffects && entryMatchesAutoFallbackPrimaryProbe(sessionEntry, autoFallbackPrimaryProbe) ) { const nextSessionEntry = { ...sessionEntry }; @@ -1493,7 +1551,7 @@ async function agentCommandInternal( await fallbackTrajectoryRecorder?.flush(); // Update token+model fields in the session store. - if (sessionStore && sessionKey) { + if (sessionStore && sessionKey && !suppressVisibleSessionEffects) { const { updateSessionStoreAfterAgentRun } = await loadSessionStoreRuntime(); await updateSessionStoreAfterAgentRun({ cfg, @@ -1524,28 +1582,42 @@ async function agentCommandInternal( if (transcriptPersistenceRunner === "cli" || embeddedAssistantGapFill) { let persistedCliTurnTranscript = false; try { + const transcriptSessionEntry: SessionEntry | undefined = suppressVisibleSessionEffects + ? { + ...(sessionEntry ?? { + sessionId, + updatedAt: Date.now(), + sessionStartedAt: Date.now(), + }), + sessionId, + sessionFile: attemptSessionFile, + } + : sessionEntry; sessionEntry = await attemptExecutionRuntime.persistCliTurnTranscript({ body, transcriptBody, result, sessionId, sessionKey: sessionKey ?? sessionId, - sessionEntry, - sessionStore, - storePath, + sessionEntry: transcriptSessionEntry, + sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore, + storePath: suppressVisibleSessionEffects ? undefined : storePath, sessionAgentId, threadId: opts.threadId, sessionCwd: workspaceDir, config: cfg, embeddedAssistantGapFill, }); + if (suppressVisibleSessionEffects) { + sessionEntry = prepared.sessionEntry; + } persistedCliTurnTranscript = true; } catch (error) { log.warn( `Turn transcript persistence failed for ${sessionKey ?? sessionId}: ${error instanceof Error ? error.message : String(error)}`, ); } - if (persistedCliTurnTranscript) { + if (persistedCliTurnTranscript && !suppressVisibleSessionEffects) { sessionEntry = await ( await loadCliCompactionRuntime() ).runCliTurnCompactionLifecycle({ @@ -1579,6 +1651,7 @@ async function agentCommandInternal( opts.deliver === true && sessionStore && sessionKey && + !suppressVisibleSessionEffects && payloads.length > 0 && !isSubagentSessionKey(sessionKey) ) { @@ -1612,7 +1685,7 @@ async function agentCommandInternal( const { deliverAgentCommandResult } = await loadDeliveryRuntime(); const resolveFreshSessionEntryForDelivery = - sessionStore && sessionKey + sessionStore && sessionKey && !suppressVisibleSessionEffects ? async (): Promise => { const { loadSessionStore } = await loadSessionStoreRuntime(); const freshStore = loadSessionStore(storePath, { @@ -1648,7 +1721,12 @@ async function agentCommandInternal( ); // Phase 2: Clear pending delivery payload after successful delivery. - if (sessionStore && sessionKey && !isSubagentSessionKey(sessionKey)) { + if ( + sessionStore && + sessionKey && + !isSubagentSessionKey(sessionKey) && + !suppressVisibleSessionEffects + ) { const entry = sessionStore[sessionKey] ?? sessionEntry; const noPendingTextForThisRun = opts.deliver === true && diff --git a/src/agents/command/types.ts b/src/agents/command/types.ts index 5fba0daeab9..1384a0d278a 100644 --- a/src/agents/command/types.ts +++ b/src/agents/command/types.ts @@ -106,6 +106,8 @@ export type AgentCommandOpts = { bootstrapContextRunKind?: "default" | "heartbeat" | "cron"; internalEvents?: AgentInternalEvent[]; inputProvenance?: InputProvenance; + /** Internal runs can execute against a session without updating visible status/model/usage. */ + sessionEffects?: "visible" | "internal"; /** Visible source replies must be sent through the message tool when set. */ sourceReplyDeliveryMode?: SourceReplyDeliveryMode; /** Internal runs can omit the channel message tool entirely. */ diff --git a/src/agents/internal-session-effects.test.ts b/src/agents/internal-session-effects.test.ts new file mode 100644 index 00000000000..5d06897a7fb --- /dev/null +++ b/src/agents/internal-session-effects.test.ts @@ -0,0 +1,72 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + prepareInternalSessionEffectsTranscript, + removeInternalSessionEffectsTranscript, +} from "./internal-session-effects.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + +afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } +}); + +describe("prepareInternalSessionEffectsTranscript", () => { + it("creates a private transcript even without a visible source file", async () => { + await withTempDir({ prefix: "openclaw-internal-session-effects-" }, async (dir) => { + process.env.OPENCLAW_STATE_DIR = dir; + + const sessionFile = await prepareInternalSessionEffectsTranscript({ + runId: "run/with space", + }); + + expect(sessionFile).toBe(path.join(dir, "internal-agent-runs", "run_with_space.jsonl")); + expect(await fs.readFile(sessionFile, "utf8")).toBe(""); + expect((await fs.stat(sessionFile)).mode & 0o777).toBe(0o600); + + await removeInternalSessionEffectsTranscript(sessionFile); + + await expect(fs.stat(sessionFile)).rejects.toMatchObject({ code: "ENOENT" }); + }); + }); + + it("copies a visible source transcript into a private transcript", async () => { + await withTempDir({ prefix: "openclaw-internal-session-effects-" }, async (dir) => { + process.env.OPENCLAW_STATE_DIR = dir; + const sourceFile = path.join(dir, "visible-session.jsonl"); + await fs.writeFile(sourceFile, '{"role":"assistant","content":"done"}\n', { + mode: 0o644, + }); + + const sessionFile = await prepareInternalSessionEffectsTranscript({ + sessionFile: sourceFile, + runId: "run-copy", + }); + + expect(await fs.readFile(sessionFile, "utf8")).toBe( + '{"role":"assistant","content":"done"}\n', + ); + expect((await fs.stat(sessionFile)).mode & 0o777).toBe(0o600); + }); + }); + + it("creates an empty private transcript when the visible source is missing", async () => { + await withTempDir({ prefix: "openclaw-internal-session-effects-" }, async (dir) => { + process.env.OPENCLAW_STATE_DIR = dir; + + const sessionFile = await prepareInternalSessionEffectsTranscript({ + sessionFile: path.join(dir, "missing-session.jsonl"), + runId: "run-missing-source", + }); + + expect(await fs.readFile(sessionFile, "utf8")).toBe(""); + expect((await fs.stat(sessionFile)).mode & 0o777).toBe(0o600); + }); + }); +}); diff --git a/src/agents/internal-session-effects.ts b/src/agents/internal-session-effects.ts new file mode 100644 index 00000000000..b529ec138c3 --- /dev/null +++ b/src/agents/internal-session-effects.ts @@ -0,0 +1,50 @@ +import { promises as fs } from "node:fs"; +import path from "node:path"; +import { resolveStateDir } from "../config/paths.js"; + +export function resolveInternalSessionEffectsTranscriptPath(runId: string): string { + const safeRunId = runId.replace(/[^a-zA-Z0-9._-]/g, "_").slice(0, 120) || "run"; + return path.join(resolveStateDir(), "internal-agent-runs", `${safeRunId}.jsonl`); +} + +export async function prepareInternalSessionEffectsTranscript(params: { + sessionFile?: string; + runId: string; +}): Promise { + // Callers must persist this path in an owning lifecycle record and invoke + // removeInternalSessionEffectsTranscript once the recovered output is no longer needed. + const sessionFile = resolveInternalSessionEffectsTranscriptPath(params.runId); + await fs.mkdir(path.dirname(sessionFile), { recursive: true, mode: 0o700 }); + if (!params.sessionFile) { + await fs.writeFile(sessionFile, "", { mode: 0o600 }); + await fs.chmod(sessionFile, 0o600); + return sessionFile; + } + try { + const contents = await fs.readFile(params.sessionFile); + await fs.writeFile(sessionFile, contents, { mode: 0o600 }); + await fs.chmod(sessionFile, 0o600); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + await fs.writeFile(sessionFile, "", { mode: 0o600 }); + await fs.chmod(sessionFile, 0o600); + } + return sessionFile; +} + +export async function removeInternalSessionEffectsTranscript( + sessionFile: string | undefined, +): Promise { + const dir = path.join(resolveStateDir(), "internal-agent-runs"); + const resolved = sessionFile ? path.resolve(sessionFile) : ""; + if (!resolved || path.dirname(resolved) !== path.resolve(dir)) { + return; + } + try { + await fs.rm(resolved, { force: true }); + } catch { + // Best-effort privacy/disk cleanup; run cleanup must not fail on temp-file races. + } +} diff --git a/src/agents/subagent-announce-output.test.ts b/src/agents/subagent-announce-output.test.ts index 67f26b0ed64..9ec50676743 100644 --- a/src/agents/subagent-announce-output.test.ts +++ b/src/agents/subagent-announce-output.test.ts @@ -7,13 +7,20 @@ import { } from "./subagent-announce-output.js"; type CallGateway = typeof import("../gateway/call.js").callGateway; +type ReadSessionMessagesAsync = + typeof import("./subagent-announce.runtime.js").readSessionMessagesAsync; -function installOutputDeps(params: { messages: Array }) { +function installOutputDeps(params: { + messages: Array; + transcriptMessages?: Array; +}) { const callGateway = vi.fn(async () => ({ messages: params.messages })); + const readSessionMessagesAsync = vi.fn(async () => params.transcriptMessages ?? []); testing.setDepsForTest({ callGateway: callGateway as unknown as CallGateway, + readSessionMessagesAsync: readSessionMessagesAsync as unknown as ReadSessionMessagesAsync, }); - return { callGateway }; + return { callGateway, readSessionMessagesAsync }; } function sessionsYieldTurn(message = "Waiting for subagent completion.") { @@ -146,6 +153,56 @@ describe("readSubagentOutput", () => { await expect(readSubagentOutput("agent:main:subagent:child")).resolves.toBeUndefined(); }); + + it("reads recovered output from the private transcript before gateway history", async () => { + const deps = installOutputDeps({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "stale visible output" }], + }, + ], + transcriptMessages: [ + { + role: "assistant", + stopReason: "stop", + content: [{ type: "text", text: "fresh recovered output" }], + }, + ], + }); + + await expect( + readSubagentOutput("agent:main:subagent:child", undefined, { + sessionFile: "/tmp/openclaw-internal-run.jsonl", + }), + ).resolves.toBe("fresh recovered output"); + expect(deps.readSessionMessagesAsync).toHaveBeenCalledWith( + "agent:main:subagent:child", + undefined, + "/tmp/openclaw-internal-run.jsonl", + { mode: "recent", maxMessages: 100, maxBytes: 1024 * 1024 }, + ); + expect(deps.callGateway).not.toHaveBeenCalled(); + }); + + it("does not read visible gateway history when a private transcript is empty", async () => { + const deps = installOutputDeps({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "stale visible output" }], + }, + ], + transcriptMessages: [], + }); + + await expect( + readSubagentOutput("agent:main:subagent:child", undefined, { + sessionFile: "/tmp/openclaw-empty-internal-run.jsonl", + }), + ).resolves.toBeUndefined(); + expect(deps.callGateway).not.toHaveBeenCalled(); + }); }); describe("buildChildCompletionFindings", () => { @@ -155,7 +212,7 @@ describe("buildChildCompletionFindings", () => { childSessionKey: "agent:main:subagent:silent", task: "silent task", createdAt: 1, - frozenResultText: "ANNOUNCE_SKIP", + completion: { resultText: "ANNOUNCE_SKIP" }, outcome: { status: "ok" }, }, ]); @@ -169,7 +226,7 @@ describe("buildChildCompletionFindings", () => { childSessionKey: "agent:main:subagent:silent", task: "silent task", createdAt: 1, - frozenResultText: "ANNOUNCE_SKIP", + completion: { resultText: "ANNOUNCE_SKIP" }, outcome: { status: "error", error: "boom" }, }, ]); @@ -184,14 +241,14 @@ describe("buildChildCompletionFindings", () => { childSessionKey: "agent:main:subagent:silent", task: "silent task", createdAt: 1, - frozenResultText: "ANNOUNCE_SKIP", + completion: { resultText: "ANNOUNCE_SKIP" }, outcome: { status: "ok" }, }, { childSessionKey: "agent:main:subagent:visible", task: "visible task", createdAt: 2, - frozenResultText: "actual output", + completion: { resultText: "actual output" }, outcome: { status: "ok" }, }, ]); diff --git a/src/agents/subagent-announce-output.ts b/src/agents/subagent-announce-output.ts index 0f883348ef4..fa5a9188d82 100644 --- a/src/agents/subagent-announce-output.ts +++ b/src/agents/subagent-announce-output.ts @@ -9,6 +9,7 @@ import { callGateway, getRuntimeConfig, readSessionEntry, + readSessionMessagesAsync, resolveAgentIdFromSessionKey, resolveStorePath, } from "./subagent-announce.runtime.js"; @@ -22,6 +23,7 @@ type SubagentAnnounceOutputDeps = { callGateway: typeof callGateway; getRuntimeConfig: typeof getRuntimeConfig; readSessionEntry: typeof readSessionEntry; + readSessionMessagesAsync: typeof readSessionMessagesAsync; resolveAgentIdFromSessionKey: typeof resolveAgentIdFromSessionKey; resolveStorePath: typeof resolveStorePath; }; @@ -30,6 +32,7 @@ const defaultSubagentAnnounceOutputDeps: SubagentAnnounceOutputDeps = { callGateway, getRuntimeConfig, readSessionEntry, + readSessionMessagesAsync, resolveAgentIdFromSessionKey, resolveStorePath, }; @@ -194,13 +197,31 @@ function selectSubagentOutputText(snapshot: SubagentOutputSnapshot): string | un export async function readSubagentOutput( sessionKey: string, _outcome?: SubagentRunOutcome, + options?: { sessionFile?: string }, ): Promise { - const history = await subagentAnnounceOutputDeps.callGateway({ - method: "chat.history", - params: { sessionKey, limit: 100 }, - }); - const messages = Array.isArray(history?.messages) ? history.messages : []; - const snapshot = summarizeSubagentOutputHistory(messages); + let messages: unknown[] | undefined; + if (options?.sessionFile) { + const transcriptMessages = await subagentAnnounceOutputDeps.readSessionMessagesAsync( + sessionKey, + undefined, + options.sessionFile, + { + mode: "recent", + maxMessages: 100, + maxBytes: 1024 * 1024, + }, + ); + messages = transcriptMessages; + } + const history = + messages === undefined + ? await subagentAnnounceOutputDeps.callGateway({ + method: "chat.history", + params: { sessionKey, limit: 100 }, + }) + : undefined; + const sourceMessages = messages ?? (Array.isArray(history?.messages) ? history.messages : []); + const snapshot = summarizeSubagentOutputHistory(sourceMessages); const selected = selectSubagentOutputText(snapshot); if (selected?.trim()) { return selected; @@ -272,7 +293,7 @@ export function applySubagentWaitOutcome(params: { export async function captureSubagentCompletionReply( sessionKey: string, - options?: { waitForReply?: boolean; outcome?: SubagentRunOutcome }, + options?: { waitForReply?: boolean; outcome?: SubagentRunOutcome; sessionFile?: string }, ): Promise { return await captureSubagentCompletionReplyUsing({ sessionKey, @@ -280,7 +301,9 @@ export async function captureSubagentCompletionReply( maxWaitMs: isFastTestMode() ? 50 : 1_500, retryIntervalMs: isFastTestMode() ? FAST_TEST_RETRY_INTERVAL_MS : 100, readSubagentOutput: async (nextSessionKey) => - await readSubagentOutput(nextSessionKey, options?.outcome), + await readSubagentOutput(nextSessionKey, options?.outcome, { + sessionFile: options?.sessionFile, + }), }); } @@ -316,7 +339,9 @@ export function buildChildCompletionFindings( label?: string; createdAt: number; endedAt?: number; - frozenResultText?: string | null; + completion?: { + resultText?: string | null; + }; outcome?: SubagentRunOutcome; }>, ): string | undefined { @@ -331,7 +356,7 @@ export function buildChildCompletionFindings( const sections: string[] = []; for (const [index, child] of sorted.entries()) { - const resultText = child.frozenResultText?.trim(); + const resultText = child.completion?.resultText?.trim(); const outcome = describeSubagentOutcome(child.outcome); if ( child.outcome?.status === "ok" && @@ -367,7 +392,9 @@ export function dedupeLatestChildCompletionRows( label?: string; createdAt: number; endedAt?: number; - frozenResultText?: string | null; + completion?: { + resultText?: string | null; + }; outcome?: SubagentRunOutcome; }>, ) { @@ -390,7 +417,9 @@ export function filterCurrentDirectChildCompletionRows( label?: string; createdAt: number; endedAt?: number; - frozenResultText?: string | null; + completion?: { + resultText?: string | null; + }; outcome?: SubagentRunOutcome; }>, params: { diff --git a/src/agents/subagent-announce.live.test.ts b/src/agents/subagent-announce.live.test.ts index 091ec05e5c5..82513251e6f 100644 --- a/src/agents/subagent-announce.live.test.ts +++ b/src/agents/subagent-announce.live.test.ts @@ -314,11 +314,11 @@ describeLive("subagent announce live", () => { return listSubagentRunsForRequester(sessionKey).find( (run) => run.taskName === "issue_82913_child" && - run.frozenResultText?.includes(childToken) === true && + run.completion?.resultText?.includes(childToken) === true && run.outcome?.status === "ok", ); }); - expect(completedRunBeforeDelivery.completionAnnouncedAt).toBeUndefined(); + expect(completedRunBeforeDelivery.delivery?.announcedAt).toBeUndefined(); expect(parentObservedAt).toBeUndefined(); const parent = await initialRequest; @@ -330,14 +330,14 @@ describeLive("subagent announce live", () => { listSubagentRunsForRequester(sessionKey).find( (run) => run.runId === completedRunBeforeDelivery.runId && - typeof run.completionEnqueuedAt === "number" && - typeof run.completionDeliveredAt === "number" && - typeof run.completionAnnouncedAt === "number", + typeof run.delivery?.enqueuedAt === "number" && + typeof run.delivery?.deliveredAt === "number" && + typeof run.delivery?.announcedAt === "number", ), ); - const enqueuedAt = completedRun.completionEnqueuedAt!; - const deliveredAt = completedRun.completionDeliveredAt!; - const announcedAt = completedRun.completionAnnouncedAt!; + const enqueuedAt = completedRun.delivery?.enqueuedAt ?? 0; + const deliveredAt = completedRun.delivery?.deliveredAt ?? 0; + const announcedAt = completedRun.delivery?.announcedAt ?? 0; const enqueuedToDeliveredMs = deliveredAt - enqueuedAt; const announcedToParentObservedMs = Math.abs(parentObservedAt - announcedAt); console.log( @@ -352,7 +352,7 @@ describeLive("subagent announce live", () => { announcedToParentObservedMs, })}`, ); - expect(completedRun.completionAnnouncedAt).toBe(deliveredAt); + expect(completedRun.delivery?.announcedAt).toBe(deliveredAt); expect(enqueuedToDeliveredMs).toBeGreaterThan(10_000); expect(announcedToParentObservedMs).toBeLessThan(20_000); }, @@ -499,12 +499,12 @@ describeLive("subagent announce live", () => { return listSubagentRunsForRequester(sessionKey).find( (run) => run.taskName === "steered_child" && - run.frozenResultText?.includes(childToken) === true && + run.completion?.resultText?.includes(childToken) === true && run.outcome?.status === "ok", ); }); expect(steeredRun.endedReason).toBe("subagent-complete"); - expect(steeredRun.lastAnnounceDeliveryError).toBeUndefined(); + expect(steeredRun.delivery?.lastError).toBeUndefined(); await waitFor("in-process subagent completion agent dispatch start", () => { if (initialError) { @@ -659,7 +659,8 @@ describeLive("subagent announce live", () => { const completed = childTokens.every((childToken) => runs.some( (run) => - run.frozenResultText?.includes(childToken) === true && run.outcome?.status === "ok", + run.completion?.resultText?.includes(childToken) === true && + run.outcome?.status === "ok", ), ); return completed ? runs : undefined; @@ -667,7 +668,9 @@ describeLive("subagent announce live", () => { expect(completedRuns).toHaveLength(3); for (const childToken of childTokens) { - expect(completedRuns.some((run) => run.frozenResultText?.includes(childToken))).toBe(true); + expect(completedRuns.some((run) => run.completion?.resultText?.includes(childToken))).toBe( + true, + ); } const parent = await initialRequest; diff --git a/src/agents/subagent-announce.runtime.ts b/src/agents/subagent-announce.runtime.ts index 6c799ac7957..9bc2beb4ba0 100644 --- a/src/agents/subagent-announce.runtime.ts +++ b/src/agents/subagent-announce.runtime.ts @@ -6,5 +6,6 @@ export { resolveStorePath, } from "../config/sessions.js"; export { callGateway } from "../gateway/call.js"; +export { readSessionMessagesAsync } from "../gateway/session-utils.fs.js"; export { dispatchGatewayMethodInProcess } from "../gateway/server-plugins.js"; export { isEmbeddedPiRunActive, waitForEmbeddedPiRunEnd } from "./pi-embedded-runner/runs.js"; diff --git a/src/agents/subagent-delivery-state.test.ts b/src/agents/subagent-delivery-state.test.ts new file mode 100644 index 00000000000..06ae1376d6c --- /dev/null +++ b/src/agents/subagent-delivery-state.test.ts @@ -0,0 +1,120 @@ +import { describe, expect, it } from "vitest"; +import { normalizeSubagentRunState } from "./subagent-delivery-state.js"; +import type { LegacySubagentRunRecord } from "./subagent-delivery-state.js"; +import type { SubagentRunRecord } from "./subagent-registry.types.js"; + +function baseRun(overrides: Partial = {}): LegacySubagentRunRecord { + return { + runId: "run-1", + childSessionKey: "agent:main:subagent:child", + requesterSessionKey: "agent:main:parent", + requesterDisplayKey: "agent:main:parent", + controllerSessionKey: "agent:main:parent", + task: "inspect", + cleanup: "keep", + spawnMode: "run", + createdAt: 100, + startedAt: 100, + expectsCompletionMessage: true, + ...overrides, + }; +} + +describe("normalizeSubagentRunState", () => { + it("migrates legacy pending delivery fields into nested completion and delivery state", () => { + const entry = normalizeSubagentRunState( + baseRun({ + frozenResultText: "child output", + frozenResultCapturedAt: 200, + pendingFinalDelivery: true, + pendingFinalDeliveryCreatedAt: 210, + pendingFinalDeliveryLastAttemptAt: 220, + pendingFinalDeliveryAttemptCount: 3, + pendingFinalDeliveryLastError: "sink unavailable", + pendingFinalDeliveryPayload: { + requesterSessionKey: "agent:main:parent", + requesterDisplayKey: "agent:main:parent", + childSessionKey: "agent:main:subagent:child", + childRunId: "run-1", + task: "inspect", + startedAt: 100, + expectsCompletionMessage: true, + frozenResultText: "child output", + }, + }), + ) as SubagentRunRecord & { pendingFinalDelivery?: boolean; frozenResultText?: string }; + + expect(entry.completion).toMatchObject({ + required: true, + resultText: "child output", + capturedAt: 200, + }); + expect(entry.delivery).toMatchObject({ + status: "pending", + createdAt: 210, + lastAttemptAt: 220, + attemptCount: 3, + lastError: "sink unavailable", + payload: expect.objectContaining({ childRunId: "run-1" }), + }); + expect(entry.pendingFinalDelivery).toBeUndefined(); + expect(entry.frozenResultText).toBeUndefined(); + }); + + it("merges partial nested state with legacy fields before stripping legacy fields", () => { + const entry = normalizeSubagentRunState( + baseRun({ + completion: { required: true }, + delivery: { status: "not_required" }, + pendingFinalDelivery: true, + pendingFinalDeliveryAttemptCount: 2, + lastAnnounceRetryAt: 240, + frozenResultText: "legacy result", + }), + ) as SubagentRunRecord & { pendingFinalDelivery?: boolean; lastAnnounceRetryAt?: number }; + + expect(entry.completion?.resultText).toBe("legacy result"); + expect(entry.delivery).toMatchObject({ + status: "pending", + attemptCount: 2, + lastAttemptAt: 240, + }); + expect(entry.pendingFinalDelivery).toBeUndefined(); + expect(entry.lastAnnounceRetryAt).toBeUndefined(); + }); + + it("clears stale cleanupHandled locks for unfinished restored cleanup", () => { + const entry = normalizeSubagentRunState(baseRun({ cleanupHandled: true })); + + expect(entry.cleanupHandled).toBe(false); + }); + + it("clears stale cleanupHandled locks after delivered notification if cleanup did not finish", () => { + const entry = normalizeSubagentRunState( + baseRun({ + cleanupHandled: true, + delivery: { + status: "delivered", + announcedAt: 400, + }, + }), + ); + + expect(entry.cleanupHandled).toBe(false); + }); + + it("keeps discarded terminal delivery dormant across restart", () => { + const entry = normalizeSubagentRunState( + baseRun({ + cleanupHandled: true, + delivery: { + status: "discarded", + discardedAt: 400, + discardReason: "expired", + }, + }), + ); + + expect(entry.cleanupHandled).toBe(true); + }); +}); diff --git a/src/agents/subagent-delivery-state.ts b/src/agents/subagent-delivery-state.ts new file mode 100644 index 00000000000..b76b43f2185 --- /dev/null +++ b/src/agents/subagent-delivery-state.ts @@ -0,0 +1,258 @@ +import type { + PendingFinalDeliveryPayload, + SubagentCompletionDeliveryState, + SubagentCompletionState, + SubagentExecutionState, + SubagentRunRecord, +} from "./subagent-registry.types.js"; + +export type LegacySubagentRunRecord = SubagentRunRecord & { + announceRetryCount?: number; + lastAnnounceRetryAt?: number; + lastAnnounceDeliveryError?: string; + frozenResultText?: string | null; + frozenResultCapturedAt?: number; + fallbackFrozenResultText?: string | null; + fallbackFrozenResultCapturedAt?: number; + pendingFinalDelivery?: boolean; + pendingFinalDeliveryCreatedAt?: number; + pendingFinalDeliveryLastAttemptAt?: number; + pendingFinalDeliveryAttemptCount?: number; + pendingFinalDeliveryLastError?: string | null; + pendingFinalDeliveryPayload?: PendingFinalDeliveryPayload; + deliverySuspendedAt?: number; + deliverySuspendedReason?: "retry-limit" | "expiry"; + deliveryDiscardedAt?: number; + deliveryDiscardReason?: "expired" | "pressure-pruned"; + deliveryDiscardedPayloadSummary?: SubagentCompletionDeliveryState["discardedPayloadSummary"]; + completionEnqueuedAt?: number; + completionDeliveredAt?: number; + completionAnnouncedAt?: number; + lastAnnounceDropReason?: SubagentCompletionDeliveryState["lastDropReason"]; +}; + +export function normalizeSubagentRunState(entry: SubagentRunRecord): SubagentRunRecord { + const legacy = entry as LegacySubagentRunRecord; + entry.execution = mergeExecutionState(entry.execution, buildExecutionState(entry)); + entry.completion = mergeCompletionState(entry.completion, buildCompletionState(entry, legacy)); + entry.delivery = mergeDeliveryState(entry, entry.delivery, buildDeliveryState(entry, legacy)); + // cleanupHandled is an in-process lock; after restart, unfinished cleanup must retry. + if ( + entry.cleanupHandled === true && + typeof entry.cleanupCompletedAt !== "number" && + entry.delivery?.status !== "discarded" + ) { + entry.cleanupHandled = false; + } + delete legacy.announceRetryCount; + delete legacy.lastAnnounceRetryAt; + delete legacy.lastAnnounceDeliveryError; + delete legacy.frozenResultText; + delete legacy.frozenResultCapturedAt; + delete legacy.fallbackFrozenResultText; + delete legacy.fallbackFrozenResultCapturedAt; + delete legacy.pendingFinalDelivery; + delete legacy.pendingFinalDeliveryCreatedAt; + delete legacy.pendingFinalDeliveryLastAttemptAt; + delete legacy.pendingFinalDeliveryAttemptCount; + delete legacy.pendingFinalDeliveryLastError; + delete legacy.pendingFinalDeliveryPayload; + delete legacy.deliverySuspendedAt; + delete legacy.deliverySuspendedReason; + delete legacy.deliveryDiscardedAt; + delete legacy.deliveryDiscardReason; + delete legacy.deliveryDiscardedPayloadSummary; + delete legacy.completionEnqueuedAt; + delete legacy.completionDeliveredAt; + delete legacy.completionAnnouncedAt; + delete legacy.lastAnnounceDropReason; + return entry; +} + +function mergeExecutionState( + current: SubagentExecutionState | undefined, + restored: SubagentExecutionState, +): SubagentExecutionState { + return current ? { ...restored, ...current } : restored; +} + +function mergeCompletionState( + current: SubagentCompletionState | undefined, + restored: SubagentCompletionState, +): SubagentCompletionState { + if (!current) { + return restored; + } + return { + ...restored, + ...current, + required: current.required ?? restored.required, + }; +} + +function mergeDeliveryState( + entry: SubagentRunRecord, + current: SubagentCompletionDeliveryState | undefined, + restored: SubagentCompletionDeliveryState, +): SubagentCompletionDeliveryState { + if (!current) { + return restored; + } + const status = + current.status === "not_required" && + entry.expectsCompletionMessage !== false && + restored.status !== "not_required" + ? restored.status + : current.status; + return { + ...restored, + ...current, + status, + payload: current.payload ?? restored.payload, + createdAt: current.createdAt ?? restored.createdAt, + enqueuedAt: current.enqueuedAt ?? restored.enqueuedAt, + deliveredAt: current.deliveredAt ?? restored.deliveredAt, + announcedAt: current.announcedAt ?? restored.announcedAt, + lastAttemptAt: current.lastAttemptAt ?? restored.lastAttemptAt, + attemptCount: current.attemptCount ?? restored.attemptCount, + lastError: current.lastError ?? restored.lastError, + suspendedAt: current.suspendedAt ?? restored.suspendedAt, + suspendedReason: current.suspendedReason ?? restored.suspendedReason, + discardedAt: current.discardedAt ?? restored.discardedAt, + discardReason: current.discardReason ?? restored.discardReason, + discardedPayloadSummary: current.discardedPayloadSummary ?? restored.discardedPayloadSummary, + lastDropReason: current.lastDropReason ?? restored.lastDropReason, + }; +} + +function buildExecutionState(entry: SubagentRunRecord): SubagentExecutionState { + if (typeof entry.endedAt === "number") { + return { + status: "terminal", + startedAt: entry.startedAt, + endedAt: entry.endedAt, + outcome: entry.outcome, + }; + } + return { + status: "running", + startedAt: entry.startedAt, + }; +} + +function buildCompletionState( + entry: SubagentRunRecord, + legacy: LegacySubagentRunRecord, +): SubagentCompletionState { + return { + required: entry.expectsCompletionMessage === true, + ...(legacy.frozenResultText !== undefined ? { resultText: legacy.frozenResultText } : {}), + ...(typeof legacy.frozenResultCapturedAt === "number" + ? { capturedAt: legacy.frozenResultCapturedAt } + : {}), + ...(legacy.fallbackFrozenResultText !== undefined + ? { fallbackResultText: legacy.fallbackFrozenResultText } + : {}), + ...(typeof legacy.fallbackFrozenResultCapturedAt === "number" + ? { fallbackCapturedAt: legacy.fallbackFrozenResultCapturedAt } + : {}), + }; +} + +function buildDeliveryState( + entry: SubagentRunRecord, + legacy: LegacySubagentRunRecord, +): SubagentCompletionDeliveryState { + if (entry.expectsCompletionMessage === false) { + return { status: "not_required" }; + } + if (typeof legacy.deliveryDiscardedAt === "number") { + return { + status: "discarded", + discardedAt: legacy.deliveryDiscardedAt, + discardReason: legacy.deliveryDiscardReason, + discardedPayloadSummary: legacy.deliveryDiscardedPayloadSummary, + }; + } + if (typeof legacy.deliverySuspendedAt === "number") { + return { + status: "suspended", + payload: legacy.pendingFinalDeliveryPayload, + createdAt: legacy.pendingFinalDeliveryCreatedAt, + lastAttemptAt: legacy.pendingFinalDeliveryLastAttemptAt ?? legacy.lastAnnounceRetryAt, + attemptCount: legacy.pendingFinalDeliveryAttemptCount ?? legacy.announceRetryCount, + lastError: legacy.pendingFinalDeliveryLastError ?? legacy.lastAnnounceDeliveryError ?? null, + suspendedAt: legacy.deliverySuspendedAt, + suspendedReason: legacy.deliverySuspendedReason, + lastDropReason: legacy.lastAnnounceDropReason, + }; + } + if (typeof legacy.completionAnnouncedAt === "number") { + return { + status: "delivered", + enqueuedAt: legacy.completionEnqueuedAt, + deliveredAt: legacy.completionDeliveredAt ?? legacy.completionAnnouncedAt, + announcedAt: legacy.completionAnnouncedAt, + lastDropReason: legacy.lastAnnounceDropReason, + }; + } + if (legacy.pendingFinalDelivery === true || legacy.pendingFinalDeliveryPayload) { + return { + status: "pending", + payload: legacy.pendingFinalDeliveryPayload, + createdAt: legacy.pendingFinalDeliveryCreatedAt, + lastAttemptAt: legacy.pendingFinalDeliveryLastAttemptAt ?? legacy.lastAnnounceRetryAt, + attemptCount: legacy.pendingFinalDeliveryAttemptCount ?? legacy.announceRetryCount, + lastError: legacy.pendingFinalDeliveryLastError ?? legacy.lastAnnounceDeliveryError ?? null, + enqueuedAt: legacy.completionEnqueuedAt, + deliveredAt: legacy.completionDeliveredAt, + lastDropReason: legacy.lastAnnounceDropReason, + }; + } + return { + status: typeof entry.endedAt === "number" ? "pending" : "not_required", + enqueuedAt: legacy.completionEnqueuedAt, + deliveredAt: legacy.completionDeliveredAt, + lastAttemptAt: legacy.lastAnnounceRetryAt, + attemptCount: legacy.announceRetryCount, + lastError: legacy.lastAnnounceDeliveryError ?? null, + lastDropReason: legacy.lastAnnounceDropReason, + }; +} + +export function ensureCompletionState(entry: SubagentRunRecord): SubagentCompletionState { + entry.completion ??= { + required: entry.expectsCompletionMessage === true, + }; + return entry.completion; +} + +export function ensureDeliveryState(entry: SubagentRunRecord): SubagentCompletionDeliveryState { + entry.delivery ??= { + status: entry.expectsCompletionMessage === false ? "not_required" : "pending", + }; + return entry.delivery; +} + +export function clearDeliveryState(entry: SubagentRunRecord): void { + entry.delivery = { + status: entry.expectsCompletionMessage === false ? "not_required" : "pending", + }; +} + +export function isDeliverySuspended(entry: SubagentRunRecord): boolean { + return entry.delivery?.status === "suspended" && typeof entry.delivery.suspendedAt === "number"; +} + +export function getDeliveryAttemptCount(entry: SubagentRunRecord): number { + return entry.delivery?.attemptCount ?? 0; +} + +export function getDeliveryLastAttemptAt(entry: SubagentRunRecord): number | undefined { + return entry.delivery?.lastAttemptAt; +} + +export function getDeliveryLastError(entry: SubagentRunRecord): string | undefined { + const error = entry.delivery?.lastError; + return typeof error === "string" && error.trim() ? error : undefined; +} diff --git a/src/agents/subagent-orphan-recovery.test.ts b/src/agents/subagent-orphan-recovery.test.ts index 410ede10af7..997b6021ab5 100644 --- a/src/agents/subagent-orphan-recovery.test.ts +++ b/src/agents/subagent-orphan-recovery.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import * as sessions from "../config/sessions.js"; import * as gateway from "../gateway/call.js"; import * as sessionUtils from "../gateway/session-utils.fs.js"; +import { resolveInternalSessionEffectsTranscriptPath } from "./internal-session-effects.js"; import * as announceDelivery from "./subagent-announce-delivery.js"; import { recoverOrphanedSubagentSessions, @@ -178,6 +179,12 @@ describe("subagent-orphan-recovery", () => { expect(replaceParams.previousRunId).toBe("run-1"); expect(replaceParams.nextRunId).toBe("test-run-id"); expect(replaceParams.fallback).toBe(run); + expect(replaceParams.transcriptFile).toBe( + resolveInternalSessionEffectsTranscriptPath("test-run-id"), + ); + expect(replaceParams.transcriptFile).not.toBe( + resolveInternalSessionEffectsTranscriptPath(params.idempotencyKey as string), + ); }); it("skips sessions that are not aborted", async () => { @@ -532,39 +539,22 @@ describe("subagent-orphan-recovery", () => { expect(message).toContain("config changes from your previous run were already applied"); }); - it("announces recovery-in-progress once when a later retry is attempting resume", async () => { + it("does not send parent-visible recovery-progress announcements on retry", async () => { mockSingleAbortedSession(); const activeRuns = createActiveRuns(createTestRunRecord()); - const notifiedRecoverySessionKeys = new Set(); await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns, - attemptNumber: 2, - maxAttempts: 4, - notifiedRecoverySessionKeys, }); - expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledOnce(); - const announcement = requireRecord( - firstCallParam( - vi.mocked(announceDelivery.deliverSubagentAnnouncement).mock.calls, - "recovery announcement", - ), - "recovery announcement params", - ); - expect(announcement.requesterSessionKey).toBe("agent:main:quietchat:direct:+1234567890"); - expect(announcement.triggerMessage).toContain("Automatic recovery is already in progress"); - expect(notifiedRecoverySessionKeys).toEqual(new Set(["agent:main:subagent:test-session-1"])); + expect(announceDelivery.deliverSubagentAnnouncement).not.toHaveBeenCalled(); await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns, - attemptNumber: 3, - maxAttempts: 4, - notifiedRecoverySessionKeys, }); - expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledOnce(); + expect(announceDelivery.deliverSubagentAnnouncement).not.toHaveBeenCalled(); }); it("prevents duplicate resume when updateSessionStore fails", async () => { diff --git a/src/agents/subagent-orphan-recovery.ts b/src/agents/subagent-orphan-recovery.ts index ec4e0337daf..619f2771b9f 100644 --- a/src/agents/subagent-orphan-recovery.ts +++ b/src/agents/subagent-orphan-recovery.ts @@ -1,10 +1,11 @@ /** - * Post-restart orphan recovery for subagent sessions. + * Post-restart interrupted-run resume for subagent sessions. * * After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls, - * this module scans for orphaned sessions (those with `abortedLastRun: true` + * this module scans for interrupted sessions (those with `abortedLastRun: true` * that are still tracked as active in the subagent registry) and sends a - * synthetic resume message to restart their work. + * synthetic resume message to restart their work. Parent notification is handled + * separately by completion delivery after the child reaches a terminal result. * * @see https://github.com/openclaw/openclaw/issues/47711 */ @@ -22,13 +23,7 @@ import { callGateway } from "../gateway/call.js"; import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { buildAnnounceIdempotencyKey } from "./announce-idempotency.js"; -import { - deliverSubagentAnnouncement, - isInternalAnnounceRequesterSession, - loadRequesterSessionEntry, -} from "./subagent-announce-delivery.js"; -import { resolveAnnounceOrigin } from "./subagent-announce-origin.js"; +import { resolveInternalSessionEffectsTranscriptPath } from "./internal-session-effects.js"; import { evaluateSubagentRecoveryGate, markSubagentRecoveryAttempt, @@ -40,12 +35,12 @@ import { } from "./subagent-registry-steer-runtime.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; -const log = createSubsystemLogger("subagent-orphan-recovery"); +const log = createSubsystemLogger("subagent-interrupted-resume"); /** Delay before attempting recovery to let the gateway finish bootstrapping. */ const DEFAULT_RECOVERY_DELAY_MS = 5_000; -function isRestartAbortedTimeoutRun( +function isLegacyRestartInterruptedTimeout( runRecord: SubagentRunRecord, entry: SessionEntry | undefined, ): boolean { @@ -57,6 +52,21 @@ function isRestartAbortedTimeoutRun( ); } +function reclassifyLegacyRestartInterruptedRun(runRecord: SubagentRunRecord): void { + const interruptedAt = runRecord.endedAt; + runRecord.execution = { + ...runRecord.execution, + status: "interrupted", + interruptedAt, + interruptionReason: "gateway-restart", + endedAt: undefined, + outcome: undefined, + }; + runRecord.endedAt = undefined; + runRecord.endedReason = undefined; + runRecord.outcome = undefined; +} + /** * Build the resume message for an orphaned subagent. */ @@ -76,75 +86,6 @@ function buildResumeMessage(task: string, lastHumanMessage?: string): string { return message; } -function buildRecoveryProgressPrompt(params: { - task: string; - attemptNumber: number; - maxAttempts: number; -}): string { - const maxTaskLen = 160; - const taskLabel = - params.task.length > maxTaskLen ? `${params.task.slice(0, maxTaskLen)}...` : params.task; - return ( - `A spawned subagent task was interrupted by a gateway restart or connection loss. ` + - `Automatic recovery is already in progress for "${taskLabel}" ` + - `(retry ${params.attemptNumber}/${params.maxAttempts}). ` + - `Send one brief update now in your normal voice: say the task was interrupted, ` + - `you are automatically resuming/retrying it, and you will report back when it either continues or truly fails. ` + - `Do not say the task has failed.` - ); -} - -async function announceRecoveryInProgress(params: { - runRecord: SubagentRunRecord; - attemptNumber: number; - maxAttempts: number; -}): Promise { - const requesterSessionKey = params.runRecord.requesterSessionKey?.trim(); - if (!requesterSessionKey) { - return false; - } - - const requesterOrigin = params.runRecord.requesterOrigin; - const requesterIsSubagent = isInternalAnnounceRequesterSession(requesterSessionKey); - let directOrigin = requesterOrigin; - if (!requesterIsSubagent) { - const { entry } = loadRequesterSessionEntry(requesterSessionKey); - directOrigin = resolveAnnounceOrigin(entry, requesterOrigin); - } - - const prompt = buildRecoveryProgressPrompt({ - task: params.runRecord.label || params.runRecord.task, - attemptNumber: params.attemptNumber, - maxAttempts: params.maxAttempts, - }); - - try { - const delivery = await deliverSubagentAnnouncement({ - requesterSessionKey, - announceId: `${params.runRecord.runId}:recovery-progress`, - triggerMessage: prompt, - steerMessage: prompt, - summaryLine: params.runRecord.label || params.runRecord.task, - requesterSessionOrigin: requesterOrigin, - requesterOrigin, - completionDirectOrigin: requesterOrigin, - directOrigin, - sourceSessionKey: params.runRecord.childSessionKey, - sourceTool: "subagent_orphan_recovery", - targetRequesterSessionKey: requesterSessionKey, - requesterIsSubagent, - expectsCompletionMessage: false, - bestEffortDeliver: true, - directIdempotencyKey: buildAnnounceIdempotencyKey( - `${params.runRecord.runId}:recovery-progress`, - ), - }); - return delivery.delivered; - } catch { - return false; - } -} - function extractMessageText(msg: unknown): string | undefined { if (!msg || typeof msg !== "object") { return undefined; @@ -187,14 +128,23 @@ async function resumeOrphanedSession(params: { } try { + const idempotencyKey = crypto.randomUUID(); const result = await callGateway<{ runId: string }>({ method: "agent", params: { message: resumeMessage, sessionKey: params.sessionKey, - idempotencyKey: crypto.randomUUID(), + idempotencyKey, deliver: false, lane: "subagent", + inputProvenance: { + kind: "inter_session", + sourceSessionKey: params.originalRun.requesterSessionKey, + sourceChannel: "internal", + sourceTool: "subagent_interrupted_resume", + }, + sessionEffects: "internal", + suppressPromptPersistence: true, }, timeoutMs: 10_000, }); @@ -202,6 +152,7 @@ async function resumeOrphanedSession(params: { previousRunId: params.originalRunId, nextRunId: result.runId, fallback: params.originalRun, + transcriptFile: resolveInternalSessionEffectsTranscriptPath(result.runId), }); if (!remapped) { log.warn( @@ -233,12 +184,6 @@ export async function recoverOrphanedSubagentSessions(params: { getActiveRuns: () => Map; /** Persisted across retries so already-resumed sessions are not resumed again. */ resumedSessionKeys?: Set; - /** Human-visible attempt number for this recovery pass. */ - attemptNumber?: number; - /** Total recovery attempts before giving up. */ - maxAttempts?: number; - /** Persisted across retries so recovery-in-progress notices stay deduped. */ - notifiedRecoverySessionKeys?: Set; }): Promise<{ recovered: number; failed: number; @@ -252,9 +197,6 @@ export async function recoverOrphanedSubagentSessions(params: { failedRuns: [] as Array<{ runId: string; childSessionKey: string; error?: string }>, }; const resumedSessionKeys = params.resumedSessionKeys ?? new Set(); - const attemptNumber = Math.max(1, params.attemptNumber ?? 1); - const maxAttempts = Math.max(attemptNumber, params.maxAttempts ?? attemptNumber); - const notifiedRecoverySessionKeys = params.notifiedRecoverySessionKeys ?? new Set(); const configChangePattern = /openclaw\.json|openclaw gateway restart|config\.patch/i; try { @@ -293,13 +235,14 @@ export async function recoverOrphanedSubagentSessions(params: { continue; } - // Restart-aborted subagents can be marked ended with a timeout outcome - // before the gateway comes back up to resume them. - if ( - typeof runRecord.endedAt === "number" && - runRecord.endedAt > 0 && - !isRestartAbortedTimeoutRun(runRecord, entry) - ) { + if (isLegacyRestartInterruptedTimeout(runRecord, entry)) { + reclassifyLegacyRestartInterruptedRun(runRecord); + } + + // Terminal child outcomes are immutable. Restart resume only applies to + // non-terminal interrupted execution; delivery retry handles terminal + // child results separately. + if (typeof runRecord.endedAt === "number" && runRecord.endedAt > 0) { result.skipped++; continue; } @@ -371,17 +314,6 @@ export async function recoverOrphanedSubagentSessions(params: { return typeof text === "string" && configChangePattern.test(text); }); - if (attemptNumber > 1 && !notifiedRecoverySessionKeys.has(childSessionKey)) { - const notified = await announceRecoveryInProgress({ - runRecord, - attemptNumber, - maxAttempts, - }); - if (notified) { - notifiedRecoverySessionKeys.add(childSessionKey); - } - } - // Resume the session with the original task context. // We intentionally do NOT clear abortedLastRun before attempting // the resume — if callGateway fails (e.g. gateway still booting), @@ -492,16 +424,11 @@ export function scheduleOrphanRecovery(params: { const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES; const resumedSessionKeys = new Set(); - const notifiedRecoverySessionKeys = new Set(); - const attemptRecovery = (attempt: number, delay: number) => { setTimeout(() => { void recoverOrphanedSubagentSessions({ ...params, resumedSessionKeys, - attemptNumber: attempt + 1, - maxAttempts: maxRetries + 1, - notifiedRecoverySessionKeys, }) .then((result) => { if (result.failed > 0 && attempt < maxRetries) { diff --git a/src/agents/subagent-registry-cleanup.test.ts b/src/agents/subagent-registry-cleanup.test.ts index c816503b9e0..1bcbf71564d 100644 --- a/src/agents/subagent-registry-cleanup.test.ts +++ b/src/agents/subagent-registry-cleanup.test.ts @@ -68,7 +68,10 @@ describe("resolveDeferredCleanupDecision", () => { it("uses retry backoff for completion-message flows once descendants are settled", () => { const decision = resolveDecision({ - entry: makeEntry({ expectsCompletionMessage: true, announceRetryCount: 1 }), + entry: makeEntry({ + expectsCompletionMessage: true, + delivery: { status: "pending", attemptCount: 1 }, + }), activeDescendantRuns: 0, resolveAnnounceRetryDelayMs: (retryCount) => retryCount * 1_000, }); @@ -78,7 +81,10 @@ describe("resolveDeferredCleanupDecision", () => { it("uses retry backoff for non-completion flows so cleanup can settle after announce failures", () => { const decision = resolveDecision({ - entry: makeEntry({ expectsCompletionMessage: false, announceRetryCount: 1 }), + entry: makeEntry({ + expectsCompletionMessage: false, + delivery: { status: "not_required", attemptCount: 1 }, + }), activeDescendantRuns: 0, resolveAnnounceRetryDelayMs: (retryCount) => retryCount * 1_000, }); diff --git a/src/agents/subagent-registry-cleanup.ts b/src/agents/subagent-registry-cleanup.ts index f892a3686d9..81c8b69905e 100644 --- a/src/agents/subagent-registry-cleanup.ts +++ b/src/agents/subagent-registry-cleanup.ts @@ -1,3 +1,4 @@ +import { getDeliveryAttemptCount } from "./subagent-delivery-state.js"; import { SUBAGENT_ENDED_REASON_COMPLETE, type SubagentLifecycleEndedReason, @@ -51,7 +52,7 @@ export function resolveDeferredCleanupDecision(params: { return { kind: "defer-descendants", delayMs: params.deferDescendantDelayMs }; } - const retryCount = (params.entry.announceRetryCount ?? 0) + 1; + const retryCount = getDeliveryAttemptCount(params.entry) + 1; const expiryExceeded = isCompletionMessageFlow ? completionHardExpiryExceeded : endedAgo > params.announceExpiryMs; diff --git a/src/agents/subagent-registry-helpers.test.ts b/src/agents/subagent-registry-helpers.test.ts index 99a1221b7c9..331ad06d7eb 100644 --- a/src/agents/subagent-registry-helpers.test.ts +++ b/src/agents/subagent-registry-helpers.test.ts @@ -65,8 +65,11 @@ describe("logAnnounceGiveUp", () => { const logSpy = vi.spyOn(defaultRuntime, "log").mockImplementation(() => {}); const entry = createRunEntry({ endedAt: 4_000, - announceRetryCount: 3, - lastAnnounceDeliveryError: "direct-primary: routed-dispatch-did-not-queue-final", + delivery: { + status: "failed", + attemptCount: 3, + lastError: "direct-primary: routed-dispatch-did-not-queue-final", + }, }); logAnnounceGiveUp(entry, "retry-limit"); @@ -80,7 +83,10 @@ describe("logAnnounceGiveUp", () => { it("normalizes multiline delivery errors onto one gateway log line", () => { const logSpy = vi.spyOn(defaultRuntime, "log").mockImplementation(() => {}); const entry = createRunEntry({ - lastAnnounceDeliveryError: "gateway timeout\nphase: routed dispatch failed", + delivery: { + status: "failed", + lastError: "gateway timeout\nphase: routed dispatch failed", + }, }); logAnnounceGiveUp(entry, "expiry"); diff --git a/src/agents/subagent-registry-helpers.ts b/src/agents/subagent-registry-helpers.ts index 6793283ce0b..cbbcec1f6bf 100644 --- a/src/agents/subagent-registry-helpers.ts +++ b/src/agents/subagent-registry-helpers.ts @@ -13,6 +13,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { defaultRuntime } from "../runtime.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { withSubagentOutcomeTiming } from "./subagent-announce-output.js"; +import { getDeliveryAttemptCount, getDeliveryLastError } from "./subagent-delivery-state.js"; import { SUBAGENT_ENDED_REASON_ERROR } from "./subagent-lifecycle-events.js"; import { shouldUpdateRunOutcome } from "./subagent-registry-completion.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; @@ -71,12 +72,13 @@ function formatAnnounceGiveUpLogField(value: string): string { } export function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") { - const retryCount = entry.announceRetryCount ?? 0; + const retryCount = getDeliveryAttemptCount(entry); const endedAgoMs = typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined; const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a"; - const deliveryError = entry.lastAnnounceDeliveryError?.trim() - ? ` deliveryError=${formatAnnounceGiveUpLogField(entry.lastAnnounceDeliveryError)}` + const lastDeliveryError = getDeliveryLastError(entry); + const deliveryError = lastDeliveryError + ? ` deliveryError=${formatAnnounceGiveUpLogField(lastDeliveryError)}` : ""; defaultRuntime.log( `[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}${deliveryError}`, diff --git a/src/agents/subagent-registry-lifecycle.test.ts b/src/agents/subagent-registry-lifecycle.test.ts index 82c9e49e9b2..37fe4969c56 100644 --- a/src/agents/subagent-registry-lifecycle.test.ts +++ b/src/agents/subagent-registry-lifecycle.test.ts @@ -573,10 +573,10 @@ describe("subagent registry lifecycle hardening", () => { }), ).resolves.toBeUndefined(); - await vi.waitFor(() => expect(entry.completionAnnouncedAt).toBe(12_300)); - expect(entry.completionEnqueuedAt).toBe(4_100); - expect(entry.completionDeliveredAt).toBe(12_300); - expect(entry.lastAnnounceDropReason).toBeUndefined(); + await vi.waitFor(() => expect(entry.delivery?.announcedAt).toBe(12_300)); + expect(entry.delivery?.enqueuedAt).toBe(4_100); + expect(entry.delivery?.deliveredAt).toBe(12_300); + expect(entry.delivery?.lastDropReason).toBeUndefined(); expectFields(firstCallArg(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId), { runId: entry.runId, deliveryStatus: "delivered", @@ -611,7 +611,7 @@ describe("subagent registry lifecycle hardening", () => { expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(hasDeliveredTaskStatusUpdate(entry.runId)).toBe(false); await vi.waitFor(() => expect(entry.cleanupCompletedAt).toBeTypeOf("number")); - expect(entry.completionAnnouncedAt).toBeUndefined(); + expect(entry.delivery?.announcedAt).toBeUndefined(); }); it("archives delete-mode sessions when completion messages are disabled", async () => { @@ -655,7 +655,7 @@ describe("subagent registry lifecycle hardening", () => { expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(hasDeliveredTaskStatusUpdate(entry.runId)).toBe(false); await vi.waitFor(() => expect(runs.has(entry.runId)).toBe(false)); - expect(entry.completionAnnouncedAt).toBeUndefined(); + expect(entry.delivery?.announcedAt).toBeUndefined(); }); it("retires bundle MCP runtimes when run-mode cleanup completes", async () => { @@ -832,7 +832,7 @@ describe("subagent registry lifecycle hardening", () => { ).resolves.toBeUndefined(); expect(captureSubagentCompletionReply).not.toHaveBeenCalled(); - expect(entry.frozenResultText).toBeNull(); + expect(entry.completion?.resultText).toBeNull(); expectFields(firstCallArg(taskExecutorMocks.failTaskRunByRunId), { status: "failed", error: "All models failed (2): timeout", @@ -843,7 +843,7 @@ describe("subagent registry lifecycle hardening", () => { it("does not re-run announce flow after completion was already delivered", async () => { const entry = createRunEntry({ - completionAnnouncedAt: 3_500, + delivery: { status: "delivered", announcedAt: 3_500, deliveredAt: 3_500 }, endedAt: 4_000, }); const persist = vi.fn(); @@ -880,7 +880,7 @@ describe("subagent registry lifecycle hardening", () => { it("emits ended hook while retrying cleanup after completion was already delivered", async () => { const entry = createRunEntry({ - completionAnnouncedAt: 3_500, + delivery: { status: "delivered", announcedAt: 3_500, deliveredAt: 3_500 }, endedAt: 4_000, expectsCompletionMessage: true, }); @@ -924,7 +924,7 @@ describe("subagent registry lifecycle hardening", () => { captureSubagentCompletionReply: vi.fn(async () => undefined), }); - expect(entry.completionAnnouncedAt).toBeUndefined(); + expect(entry.delivery?.announcedAt).toBeUndefined(); await controller.finalizeResumedAnnounceGiveUp({ runId: entry.runId, @@ -942,8 +942,8 @@ describe("subagent registry lifecycle hardening", () => { endedAt: 4_000, endedReason: SUBAGENT_ENDED_REASON_COMPLETE, expectsCompletionMessage: true, - frozenResultText: "final answer", - lastAnnounceDeliveryError: "gateway request timeout for agent", + completion: { required: true, resultText: "final answer" }, + delivery: { status: "pending", lastError: "gateway request timeout for agent" }, outcome: { status: "ok" }, retainAttachmentsOnKeep: true, }); @@ -960,15 +960,15 @@ describe("subagent registry lifecycle hardening", () => { reason: "retry-limit", }); - expect(entry.pendingFinalDelivery).toBe(true); - expect(entry.pendingFinalDeliveryPayload).toMatchObject({ + expect(entry.delivery?.status).toBe("suspended"); + expect(entry.delivery?.payload).toMatchObject({ requesterSessionKey: entry.requesterSessionKey, childSessionKey: entry.childSessionKey, childRunId: entry.runId, frozenResultText: "final answer", }); - expect(entry.deliverySuspendedAt).toBeTypeOf("number"); - expect(entry.deliverySuspendedReason).toBe("retry-limit"); + expect(entry.delivery?.suspendedAt).toBeTypeOf("number"); + expect(entry.delivery?.suspendedReason).toBe("retry-limit"); expect(entry.cleanupHandled).toBe(false); expect(entry.cleanupCompletedAt).toBeUndefined(); expect(helperMocks.safeRemoveAttachmentsDir).not.toHaveBeenCalled(); @@ -1015,7 +1015,7 @@ describe("subagent registry lifecycle hardening", () => { endedAt: 4_000, endedReason, expectsCompletionMessage: true, - lastAnnounceDeliveryError: "gateway request timeout for agent", + delivery: { status: "pending", lastError: "gateway request timeout for agent" }, outcome, retainAttachmentsOnKeep: true, }); @@ -1032,10 +1032,9 @@ describe("subagent registry lifecycle hardening", () => { reason: "retry-limit", }); - expect(entry.pendingFinalDelivery).toBeUndefined(); - expect(entry.pendingFinalDeliveryPayload).toBeUndefined(); - expect(entry.deliverySuspendedAt).toBeUndefined(); - expect(entry.deliverySuspendedReason).toBeUndefined(); + expect(entry.delivery?.payload).toBeUndefined(); + expect(entry.delivery?.suspendedAt).toBeUndefined(); + expect(entry.delivery?.suspendedReason).toBeUndefined(); expect(entry.cleanupCompletedAt).toBeTypeOf("number"); expect(persist).toHaveBeenCalled(); }, @@ -1152,17 +1151,17 @@ describe("subagent registry lifecycle hardening", () => { error: "UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed", }); - expect(entry.lastAnnounceDeliveryError).toBe( + expect(entry.delivery?.lastError).toBe( "UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed", ); - expect(entry.pendingFinalDelivery).toBe(true); - expect(entry.pendingFinalDeliveryPayload).toMatchObject({ + expect(entry.delivery?.status).toBe("suspended"); + expect(entry.delivery?.payload).toMatchObject({ requesterSessionKey: entry.requesterSessionKey, childSessionKey: entry.childSessionKey, childRunId: entry.runId, }); - expect(entry.deliverySuspendedAt).toBeTypeOf("number"); - expect(entry.deliverySuspendedReason).toBe("retry-limit"); + expect(entry.delivery?.suspendedAt).toBeTypeOf("number"); + expect(entry.delivery?.suspendedReason).toBe("retry-limit"); expect(entry.cleanupCompletedAt).toBeUndefined(); expectFields( findCallArg( @@ -1190,11 +1189,11 @@ describe("subagent registry lifecycle hardening", () => { params: { sessionKey: entry.requesterSessionKey, limit: 25, maxChars: 128 * 1024 }, timeoutMs: 5_000, }); - expect(entry.completionDeliveredAt).toBe(12_345); - expect(entry.completionAnnouncedAt).toBe(12_345); - expect(entry.lastAnnounceDeliveryError).toBeUndefined(); - expect(entry.pendingFinalDelivery).toBeUndefined(); - expect(entry.announceRetryCount).toBeUndefined(); + expect(entry.delivery?.deliveredAt).toBe(12_345); + expect(entry.delivery?.announcedAt).toBe(12_345); + expect(entry.delivery?.lastError).toBeUndefined(); + expect(entry.delivery?.payload).toBeUndefined(); + expect(entry.delivery?.attemptCount).toBeUndefined(); expect(hasDeliveredTaskStatusUpdate(entry.runId)).toBe(true); expect(helperMocks.logAnnounceGiveUp).not.toHaveBeenCalled(); @@ -1206,7 +1205,7 @@ describe("subagent registry lifecycle hardening", () => { }); await vi.waitFor(() => expect(longMirrorEntry.cleanupCompletedAt).toBeTypeOf("number")); - expect(longMirrorEntry.completionDeliveredAt).toBe(12_345); + expect(longMirrorEntry.delivery?.deliveredAt).toBe(12_345); expect(gatewayMocks.callGateway).toHaveBeenCalledWith({ method: "chat.history", params: { sessionKey: longMirrorEntry.requesterSessionKey, limit: 25, maxChars: 128 * 1024 }, @@ -1224,7 +1223,7 @@ describe("subagent registry lifecycle hardening", () => { await vi.waitFor(() => expect(messageToolAnnounceEntry.cleanupCompletedAt).toBeTypeOf("number"), ); - expect(messageToolAnnounceEntry.completionDeliveredAt).toBe(12_345); + expect(messageToolAnnounceEntry.delivery?.deliveredAt).toBe(12_345); vi.clearAllMocks(); gatewayMocks.callGateway.mockResolvedValue({}); @@ -1234,19 +1233,17 @@ describe("subagent registry lifecycle hardening", () => { }); await vi.waitFor(() => expect(childRunMirrorEntry.cleanupCompletedAt).toBeTypeOf("number")); - expect(childRunMirrorEntry.completionDeliveredAt).toBe(12_345); + expect(childRunMirrorEntry.delivery?.deliveredAt).toBe(12_345); vi.clearAllMocks(); taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId.mockReset(); gatewayMocks.callGateway.mockResolvedValue({}); const staleEntry = await runNoReplyMirrorScenario({ timestamp: 1_999 }); - await vi.waitFor(() => expect(staleEntry.deliverySuspendedAt).toBeTypeOf("number")); - expect(staleEntry.completionDeliveredAt).toBeUndefined(); - expect(staleEntry.completionAnnouncedAt).toBeUndefined(); - expect(staleEntry.lastAnnounceDeliveryError).toBe( - "completion agent did not produce a visible reply", - ); + await vi.waitFor(() => expect(staleEntry.delivery?.suspendedAt).toBeTypeOf("number")); + expect(staleEntry.delivery?.deliveredAt).toBeUndefined(); + expect(staleEntry.delivery?.announcedAt).toBeUndefined(); + expect(staleEntry.delivery?.lastError).toBe("completion agent did not produce a visible reply"); expect(hasDeliveredTaskStatusUpdate(staleEntry.runId)).toBe(false); expectFields(firstCallArg(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId), { runId: staleEntry.runId, @@ -1276,10 +1273,12 @@ describe("subagent registry lifecycle hardening", () => { )}:internal-source-reply:0`, }); - await vi.waitFor(() => expect(sameWindowSiblingEntry.deliverySuspendedAt).toBeTypeOf("number")); - expect(sameWindowSiblingEntry.completionDeliveredAt).toBeUndefined(); - expect(sameWindowSiblingEntry.completionAnnouncedAt).toBeUndefined(); - expect(sameWindowSiblingEntry.lastAnnounceDeliveryError).toBe( + await vi.waitFor(() => + expect(sameWindowSiblingEntry.delivery?.suspendedAt).toBeTypeOf("number"), + ); + expect(sameWindowSiblingEntry.delivery?.deliveredAt).toBeUndefined(); + expect(sameWindowSiblingEntry.delivery?.announcedAt).toBeUndefined(); + expect(sameWindowSiblingEntry.delivery?.lastError).toBe( "completion agent did not produce a visible reply", ); expect(hasDeliveredTaskStatusUpdate(sameWindowSiblingEntry.runId)).toBe(false); diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index a95f0574434..98ea3be6d41 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -20,9 +20,17 @@ import { buildAnnounceIdFromChildRun, buildAnnounceIdempotencyKey, } from "./announce-idempotency.js"; +import { removeInternalSessionEffectsTranscript } from "./internal-session-effects.js"; import { retireSessionMcpRuntimeForSessionKey } from "./pi-bundle-mcp-tools.js"; import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js"; import { type SubagentRunOutcome, withSubagentOutcomeTiming } from "./subagent-announce-output.js"; +import { + clearDeliveryState, + ensureCompletionState, + ensureDeliveryState, + getDeliveryLastError, + isDeliverySuspended, +} from "./subagent-delivery-state.js"; import { SUBAGENT_ENDED_REASON_COMPLETE, type SubagentLifecycleEndedReason, @@ -162,19 +170,21 @@ export function createSubagentRegistryLifecycleController(params: { entry: SubagentRunRecord, delivery: SubagentAnnounceDeliveryResult, ) => { + const deliveryState = ensureDeliveryState(entry); if (typeof delivery.enqueuedAt === "number") { - entry.completionEnqueuedAt ??= delivery.enqueuedAt; + deliveryState.enqueuedAt ??= delivery.enqueuedAt; } if (delivery.delivered) { const deliveredAt = typeof delivery.deliveredAt === "number" ? delivery.deliveredAt : Date.now(); - entry.completionDeliveredAt = deliveredAt; - entry.lastAnnounceDropReason = undefined; + deliveryState.deliveredAt = deliveredAt; + deliveryState.lastDropReason = undefined; } }; const hasPriorRequesterDeliveryMirror = async (entry: SubagentRunRecord): Promise => { - const expectedText = extractTextFromChatContent(entry.frozenResultText, { joinWith: "" }); + const completion = ensureCompletionState(entry); + const expectedText = extractTextFromChatContent(completion.resultText, { joinWith: "" }); if (entry.expectsCompletionMessage !== true || expectedText == null) { return false; } @@ -229,7 +239,7 @@ export function createSubagentRegistryLifecycleController(params: { ); }); if (mirror) { - entry.completionDeliveredAt = (mirror as { timestamp: number }).timestamp; + ensureDeliveryState(entry).deliveredAt = (mirror as { timestamp: number }).timestamp; } return Boolean(mirror); } catch { @@ -269,9 +279,10 @@ export function createSubagentRegistryLifecycleController(params: { const lastEventAt = endedAt; try { if (args.outcome.status === "ok") { + const completion = ensureCompletionState(args.entry); const terminalResult = args.entry.expectsCompletionMessage === true - ? resolveRequiredCompletionTerminalResult(args.entry.frozenResultText) + ? resolveRequiredCompletionTerminalResult(completion.resultText) : {}; completeTaskRunByRunId({ runId: args.entry.runId, @@ -279,7 +290,7 @@ export function createSubagentRegistryLifecycleController(params: { sessionKey: args.entry.childSessionKey, endedAt, lastEventAt, - progressSummary: args.entry.frozenResultText ?? undefined, + progressSummary: completion.resultText ?? undefined, terminalSummary: terminalResult.terminalSummary ?? null, terminalOutcome: terminalResult.terminalOutcome, }); @@ -293,7 +304,7 @@ export function createSubagentRegistryLifecycleController(params: { endedAt, lastEventAt, error: args.outcome.status === "error" ? args.outcome.error : undefined, - progressSummary: args.entry.frozenResultText ?? undefined, + progressSummary: ensureCompletionState(args.entry).resultText ?? undefined, terminalSummary: null, }); } catch (err) { @@ -322,7 +333,7 @@ export function createSubagentRegistryLifecycleController(params: { sessionKey: args.entry.childSessionKey, endedAt, lastEventAt: Date.now(), - progressSummary: args.entry.frozenResultText ?? undefined, + progressSummary: ensureCompletionState(args.entry).resultText ?? undefined, terminalSummary: terminalResult.terminalSummary, terminalOutcome: terminalResult.terminalOutcome, }); @@ -339,24 +350,26 @@ export function createSubagentRegistryLifecycleController(params: { entry: SubagentRunRecord, outcome: SubagentRunOutcome, ): Promise => { - if (entry.frozenResultText !== undefined) { + const completion = ensureCompletionState(entry); + if (completion.resultText !== undefined) { return false; } if (outcome.status === "error") { - entry.frozenResultText = null; - entry.frozenResultCapturedAt = Date.now(); + completion.resultText = null; + completion.capturedAt = Date.now(); return true; } try { const captured = await params.captureSubagentCompletionReply(entry.childSessionKey, { waitForReply: entry.expectsCompletionMessage === true, outcome, + sessionFile: entry.execution?.transcriptFile, }); - entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null; + completion.resultText = captured?.trim() ? capFrozenResultText(captured) : null; } catch { - entry.frozenResultText = null; + completion.resultText = null; } - entry.frozenResultCapturedAt = Date.now(); + completion.capturedAt = Date.now(); return true; }; @@ -407,14 +420,16 @@ export function createSubagentRegistryLifecycleController(params: { const capturedAt = Date.now(); let changed = false; for (const entry of candidates) { - if (entry.frozenResultText === nextFrozen) { + const completion = ensureCompletionState(entry); + if (completion.resultText === nextFrozen) { continue; } - entry.frozenResultText = nextFrozen; - entry.frozenResultCapturedAt = capturedAt; - if (entry.pendingFinalDeliveryPayload) { - entry.pendingFinalDeliveryPayload = { - ...entry.pendingFinalDeliveryPayload, + completion.resultText = nextFrozen; + completion.capturedAt = capturedAt; + const delivery = entry.delivery; + if (delivery?.payload) { + delivery.payload = { + ...delivery.payload, frozenResultText: nextFrozen, }; } @@ -446,12 +461,17 @@ export function createSubagentRegistryLifecycleController(params: { }; const clearPendingFinalDelivery = (entry: SubagentRunRecord) => { - entry.pendingFinalDelivery = undefined; - entry.pendingFinalDeliveryCreatedAt = undefined; - entry.pendingFinalDeliveryLastAttemptAt = undefined; - entry.pendingFinalDeliveryAttemptCount = undefined; - entry.pendingFinalDeliveryLastError = undefined; - entry.pendingFinalDeliveryPayload = undefined; + const delivery = ensureDeliveryState(entry); + delivery.payload = undefined; + delivery.createdAt = undefined; + delivery.lastAttemptAt = undefined; + delivery.attemptCount = undefined; + delivery.lastError = undefined; + delivery.suspendedAt = undefined; + delivery.suspendedReason = undefined; + if (delivery.status !== "delivered" && delivery.status !== "failed") { + clearDeliveryState(entry); + } }; const loadPendingFinalDeliveryPayload = ( @@ -459,28 +479,25 @@ export function createSubagentRegistryLifecycleController(params: { ): PendingFinalDeliveryPayload => { return { requesterSessionKey: - entry.pendingFinalDeliveryPayload?.requesterSessionKey ?? entry.requesterSessionKey, - requesterOrigin: entry.pendingFinalDeliveryPayload?.requesterOrigin ?? entry.requesterOrigin, + entry.delivery?.payload?.requesterSessionKey ?? entry.requesterSessionKey, + requesterOrigin: entry.delivery?.payload?.requesterOrigin ?? entry.requesterOrigin, requesterDisplayKey: - entry.pendingFinalDeliveryPayload?.requesterDisplayKey ?? entry.requesterDisplayKey, - childSessionKey: entry.pendingFinalDeliveryPayload?.childSessionKey ?? entry.childSessionKey, - childRunId: entry.pendingFinalDeliveryPayload?.childRunId ?? entry.runId, - task: entry.pendingFinalDeliveryPayload?.task ?? entry.task, - label: entry.pendingFinalDeliveryPayload?.label ?? entry.label, - startedAt: entry.pendingFinalDeliveryPayload?.startedAt ?? entry.startedAt, - endedAt: entry.pendingFinalDeliveryPayload?.endedAt ?? entry.endedAt, - outcome: entry.pendingFinalDeliveryPayload?.outcome ?? entry.outcome, + entry.delivery?.payload?.requesterDisplayKey ?? entry.requesterDisplayKey, + childSessionKey: entry.delivery?.payload?.childSessionKey ?? entry.childSessionKey, + childRunId: entry.delivery?.payload?.childRunId ?? entry.runId, + task: entry.delivery?.payload?.task ?? entry.task, + label: entry.delivery?.payload?.label ?? entry.label, + startedAt: entry.delivery?.payload?.startedAt ?? entry.startedAt, + endedAt: entry.delivery?.payload?.endedAt ?? entry.endedAt, + outcome: entry.delivery?.payload?.outcome ?? entry.outcome, expectsCompletionMessage: - entry.pendingFinalDeliveryPayload?.expectsCompletionMessage ?? - entry.expectsCompletionMessage, - spawnMode: entry.pendingFinalDeliveryPayload?.spawnMode ?? entry.spawnMode, - frozenResultText: - entry.pendingFinalDeliveryPayload?.frozenResultText ?? entry.frozenResultText, + entry.delivery?.payload?.expectsCompletionMessage ?? entry.expectsCompletionMessage, + spawnMode: entry.delivery?.payload?.spawnMode ?? entry.spawnMode, + frozenResultText: entry.delivery?.payload?.frozenResultText ?? entry.completion?.resultText, fallbackFrozenResultText: - entry.pendingFinalDeliveryPayload?.fallbackFrozenResultText ?? - entry.fallbackFrozenResultText, + entry.delivery?.payload?.fallbackFrozenResultText ?? entry.completion?.fallbackResultText, wakeOnDescendantSettle: - entry.pendingFinalDeliveryPayload?.wakeOnDescendantSettle ?? entry.wakeOnDescendantSettle, + entry.delivery?.payload?.wakeOnDescendantSettle ?? entry.wakeOnDescendantSettle, }; }; @@ -488,13 +505,13 @@ export function createSubagentRegistryLifecycleController(params: { const now = Date.now(); const payload: PendingFinalDeliveryPayload = loadPendingFinalDeliveryPayload(args.entry); - args.entry.pendingFinalDelivery = true; - args.entry.pendingFinalDeliveryCreatedAt ??= now; - args.entry.pendingFinalDeliveryLastAttemptAt = now; - args.entry.pendingFinalDeliveryAttemptCount = - (args.entry.pendingFinalDeliveryAttemptCount ?? 0) + 1; - args.entry.pendingFinalDeliveryLastError = args.error ?? null; - args.entry.pendingFinalDeliveryPayload = payload; + const delivery = ensureDeliveryState(args.entry); + delivery.status = "pending"; + delivery.createdAt ??= now; + delivery.lastAttemptAt = now; + delivery.attemptCount = (delivery.attemptCount ?? 0) + 1; + delivery.lastError = args.error ?? null; + delivery.payload = payload; }; const suspendPendingFinalDelivery = (args: { @@ -505,25 +522,28 @@ export function createSubagentRegistryLifecycleController(params: { }) => { markPendingFinalDelivery({ entry: args.entry, - error: args.error ?? args.entry.lastAnnounceDeliveryError ?? args.reason, + error: args.error ?? getDeliveryLastError(args.entry) ?? args.reason, }); const now = Date.now(); - args.entry.deliverySuspendedAt ??= now; - args.entry.deliverySuspendedReason = args.reason; + const delivery = ensureDeliveryState(args.entry); + delivery.status = "suspended"; + delivery.suspendedAt ??= now; + delivery.suspendedReason = args.reason; args.entry.cleanupHandled = false; args.entry.wakeOnDescendantSettle = undefined; - args.entry.fallbackFrozenResultText = undefined; - args.entry.fallbackFrozenResultCapturedAt = undefined; + const completion = ensureCompletionState(args.entry); + completion.fallbackResultText = undefined; + completion.fallbackCapturedAt = undefined; params.resumedRuns.delete(args.runId); safeSetSubagentTaskDeliveryStatus({ runId: args.runId, childSessionKey: args.entry.childSessionKey, deliveryStatus: "failed", - deliveryError: args.entry.lastAnnounceDeliveryError ?? args.reason, + deliveryError: getDeliveryLastError(args.entry) ?? args.reason, }); safeMarkRequiredCompletionDeliveryBlocked({ entry: args.entry, - reason: args.entry.lastAnnounceDeliveryError ?? args.reason, + reason: getDeliveryLastError(args.entry) ?? args.reason, }); logAnnounceGiveUp(args.entry, args.reason); params.persist(); @@ -545,24 +565,29 @@ export function createSubagentRegistryLifecycleController(params: { runId: giveUpParams.runId, entry: giveUpParams.entry, reason: giveUpParams.reason, - error: giveUpParams.entry.lastAnnounceDeliveryError, + error: getDeliveryLastError(giveUpParams.entry), }); return; } + const deliveryError = getDeliveryLastError(giveUpParams.entry) ?? giveUpParams.reason; clearPendingFinalDelivery(giveUpParams.entry); + const failedDelivery = ensureDeliveryState(giveUpParams.entry); + failedDelivery.status = "failed"; + failedDelivery.lastError = deliveryError; safeSetSubagentTaskDeliveryStatus({ runId: giveUpParams.runId, childSessionKey: giveUpParams.entry.childSessionKey, deliveryStatus: "failed", - deliveryError: giveUpParams.entry.lastAnnounceDeliveryError, + deliveryError, }); safeMarkRequiredCompletionDeliveryBlocked({ entry: giveUpParams.entry, - reason: giveUpParams.entry.lastAnnounceDeliveryError ?? giveUpParams.reason, + reason: deliveryError, }); giveUpParams.entry.wakeOnDescendantSettle = undefined; - giveUpParams.entry.fallbackFrozenResultText = undefined; - giveUpParams.entry.fallbackFrozenResultCapturedAt = undefined; + const completion = ensureCompletionState(giveUpParams.entry); + completion.fallbackResultText = undefined; + completion.fallbackCapturedAt = undefined; const shouldDeleteAttachments = giveUpParams.entry.cleanup === "delete" || !giveUpParams.entry.retainAttachmentsOnKeep; if (shouldDeleteAttachments) { @@ -606,7 +631,7 @@ export function createSubagentRegistryLifecycleController(params: { if (entry.cleanupCompletedAt || entry.cleanupHandled) { continue; } - if (entry.pendingFinalDelivery === true && typeof entry.deliverySuspendedAt === "number") { + if (isDeliverySuspended(entry)) { continue; } if (params.suppressAnnounceForSteerRestart(entry)) { @@ -645,6 +670,7 @@ export function createSubagentRegistryLifecycleController(params: { cleanup: "delete" | "keep"; completedAt: number; }) => { + void removeInternalSessionEffectsTranscript(cleanupParams.entry.execution?.transcriptFile); if (cleanupParams.entry.spawnMode !== "session") { void retireSessionMcpRuntimeForSessionKey({ sessionKey: cleanupParams.entry.childSessionKey, @@ -720,14 +746,18 @@ export function createSubagentRegistryLifecycleController(params: { } if (didAnnounce) { if (!options?.skipAnnounce) { - const deliveredAt = entry.completionDeliveredAt ?? Date.now(); - entry.completionDeliveredAt = deliveredAt; - entry.completionAnnouncedAt = deliveredAt; + const delivery = ensureDeliveryState(entry); + const deliveredAt = delivery.deliveredAt ?? Date.now(); + delivery.status = "delivered"; + delivery.deliveredAt = deliveredAt; + delivery.announcedAt = deliveredAt; params.persist(); } clearPendingFinalDelivery(entry); - entry.deliverySuspendedAt = undefined; - entry.deliverySuspendedReason = undefined; + const delivery = ensureDeliveryState(entry); + delivery.status = "delivered"; + delivery.suspendedAt = undefined; + delivery.suspendedReason = undefined; if (!options?.skipDeliveryStatus) { safeSetSubagentTaskDeliveryStatus({ runId, @@ -735,11 +765,12 @@ export function createSubagentRegistryLifecycleController(params: { deliveryStatus: "delivered", }); } - entry.lastAnnounceDeliveryError = undefined; - entry.lastAnnounceDropReason = undefined; + delivery.lastError = undefined; + delivery.lastDropReason = undefined; entry.wakeOnDescendantSettle = undefined; - entry.fallbackFrozenResultText = undefined; - entry.fallbackFrozenResultCapturedAt = undefined; + const completion = ensureCompletionState(entry); + completion.fallbackResultText = undefined; + completion.fallbackCapturedAt = undefined; const completionReason = resolveCleanupCompletionReason(entry); await emitCompletionEndedHookIfNeeded(entry, completionReason); const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep; @@ -747,8 +778,8 @@ export function createSubagentRegistryLifecycleController(params: { await safeRemoveAttachmentsDir(entry); } if (cleanup === "delete") { - entry.frozenResultText = undefined; - entry.frozenResultCapturedAt = undefined; + completion.resultText = undefined; + completion.capturedAt = undefined; } completeCleanupBookkeeping({ runId, @@ -772,7 +803,7 @@ export function createSubagentRegistryLifecycleController(params: { }); if (deferredDecision.kind === "defer-descendants") { - entry.lastAnnounceRetryAt = now; + ensureDeliveryState(entry).lastAttemptAt = now; entry.wakeOnDescendantSettle = true; entry.cleanupHandled = false; params.resumedRuns.delete(runId); @@ -781,35 +812,39 @@ export function createSubagentRegistryLifecycleController(params: { return; } - if (deferredDecision.retryCount != null) { - entry.announceRetryCount = deferredDecision.retryCount; - entry.lastAnnounceRetryAt = now; - } - if (deferredDecision.kind === "give-up") { if (shouldSuspendPendingFinalDelivery(entry)) { suspendPendingFinalDelivery({ runId, entry, reason: deferredDecision.reason, - error: entry.lastAnnounceDeliveryError, + error: getDeliveryLastError(entry), }); return; } + const deliveryError = getDeliveryLastError(entry) ?? deferredDecision.reason; clearPendingFinalDelivery(entry); + const failedDelivery = ensureDeliveryState(entry); + failedDelivery.status = "failed"; + failedDelivery.lastError = deliveryError; + if (deferredDecision.retryCount != null) { + failedDelivery.attemptCount = deferredDecision.retryCount; + failedDelivery.lastAttemptAt = now; + } safeSetSubagentTaskDeliveryStatus({ runId, childSessionKey: entry.childSessionKey, deliveryStatus: "failed", - deliveryError: entry.lastAnnounceDeliveryError, + deliveryError, }); safeMarkRequiredCompletionDeliveryBlocked({ entry, - reason: entry.lastAnnounceDeliveryError ?? deferredDecision.reason, + reason: deliveryError, }); entry.wakeOnDescendantSettle = undefined; - entry.fallbackFrozenResultText = undefined; - entry.fallbackFrozenResultCapturedAt = undefined; + const completion = ensureCompletionState(entry); + completion.fallbackResultText = undefined; + completion.fallbackCapturedAt = undefined; const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep; if (shouldDeleteAttachments) { await safeRemoveAttachmentsDir(entry); @@ -842,7 +877,7 @@ export function createSubagentRegistryLifecycleController(params: { }; const startSubagentAnnounceCleanupFlow = (runId: string, entry: SubagentRunRecord): boolean => { - if (typeof entry.completionAnnouncedAt === "number") { + if (typeof entry.delivery?.announcedAt === "number" || entry.delivery?.status === "delivered") { if (!beginSubagentCleanup(runId)) { return false; } @@ -894,7 +929,7 @@ export function createSubagentRegistryLifecycleController(params: { } const pendingPayload = loadPendingFinalDeliveryPayload(entry); const requesterOrigin = normalizeDeliveryContext(pendingPayload.requesterOrigin); - let latestDeliveryError = entry.lastAnnounceDeliveryError; + let latestDeliveryError = getDeliveryLastError(entry); const finalizeAnnounceCleanup = async (didAnnounce: boolean) => { const shouldCreditPriorDelivery = !didAnnounce && (await hasPriorRequesterDeliveryMirror(entry)); @@ -902,7 +937,7 @@ export function createSubagentRegistryLifecycleController(params: { latestDeliveryError = undefined; } if (!didAnnounce && latestDeliveryError) { - entry.lastAnnounceDeliveryError = latestDeliveryError; + ensureDeliveryState(entry).lastError = latestDeliveryError; } void finalizeSubagentCleanup( runId, @@ -942,19 +977,20 @@ export function createSubagentRegistryLifecycleController(params: { onDeliveryResult: (delivery) => { recordAnnounceDeliveryResult(entry, delivery); if (delivery.delivered) { - if (entry.lastAnnounceDeliveryError !== undefined) { - entry.lastAnnounceDeliveryError = undefined; + const deliveryState = ensureDeliveryState(entry); + if (deliveryState.lastError !== undefined) { + deliveryState.lastError = undefined; params.persist(); } latestDeliveryError = undefined; return; } if (delivery.path === "none") { - entry.lastAnnounceDropReason = "sink_unavailable"; + ensureDeliveryState(entry).lastDropReason = "sink_unavailable"; } latestDeliveryError = formatAnnounceDeliveryError(delivery); - if (entry.lastAnnounceDeliveryError !== latestDeliveryError) { - entry.lastAnnounceDeliveryError = latestDeliveryError; + if (ensureDeliveryState(entry).lastError !== latestDeliveryError) { + ensureDeliveryState(entry).lastError = latestDeliveryError; params.persist(); } }, @@ -995,7 +1031,7 @@ export function createSubagentRegistryLifecycleController(params: { entry.suppressAnnounceReason = undefined; entry.cleanupHandled = false; entry.cleanupCompletedAt = undefined; - entry.completionAnnouncedAt = undefined; + ensureDeliveryState(entry).announcedAt = undefined; mutated = true; } @@ -1003,6 +1039,12 @@ export function createSubagentRegistryLifecycleController(params: { typeof completeParams.endedAt === "number" ? completeParams.endedAt : Date.now(); if (entry.endedAt !== endedAt) { entry.endedAt = endedAt; + entry.execution = { + ...entry.execution, + status: "terminal", + startedAt: entry.startedAt, + endedAt, + }; mutated = true; } const outcome = withSubagentOutcomeTiming(completeParams.outcome, { @@ -1013,6 +1055,20 @@ export function createSubagentRegistryLifecycleController(params: { entry.outcome = outcome; mutated = true; } + if ( + entry.execution?.status !== "terminal" || + entry.execution.endedAt !== endedAt || + entry.execution.outcome !== outcome + ) { + entry.execution = { + ...entry.execution, + status: "terminal", + startedAt: entry.startedAt, + endedAt, + outcome, + }; + mutated = true; + } if (entry.endedReason !== completeParams.reason) { entry.endedReason = completeParams.reason; mutated = true; diff --git a/src/agents/subagent-registry-maintenance.ts b/src/agents/subagent-registry-maintenance.ts index 94e0ee33090..6e42657b5d1 100644 --- a/src/agents/subagent-registry-maintenance.ts +++ b/src/agents/subagent-registry-maintenance.ts @@ -1,4 +1,5 @@ import { registerSessionMaintenancePreserveKeysProvider } from "../config/sessions/store-maintenance-preserve.js"; +import { isDeliverySuspended } from "./subagent-delivery-state.js"; import { subagentRuns } from "./subagent-registry-memory.js"; import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; @@ -12,11 +13,11 @@ function isActiveForMaintenance(entry: SubagentRunRecord): boolean { } function isPendingFinalDeliveryForMaintenance(entry: SubagentRunRecord): boolean { - return entry.pendingFinalDelivery === true; + return entry.delivery?.status === "pending" || isDeliverySuspended(entry); } function isAwaitingCompletionAnnounceForMaintenance(entry: SubagentRunRecord): boolean { - return entry.expectsCompletionMessage === true && typeof entry.completionAnnouncedAt !== "number"; + return entry.expectsCompletionMessage === true && entry.delivery?.status !== "delivered"; } function shouldPreserveForMaintenance(entry: SubagentRunRecord): boolean { diff --git a/src/agents/subagent-registry-run-manager.ts b/src/agents/subagent-registry-run-manager.ts index fd635e2cb64..2f58929419b 100644 --- a/src/agents/subagent-registry-run-manager.ts +++ b/src/agents/subagent-registry-run-manager.ts @@ -7,9 +7,11 @@ import { formatBlockedLivenessError, isBlockedLivenessState } from "../shared/ag import { createRunningTaskRun } from "../tasks/detached-task-runtime.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; +import { removeInternalSessionEffectsTranscript } from "./internal-session-effects.js"; import { isRecoverableAgentWaitError, waitForAgentRun } from "./run-wait.js"; import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js"; import { type SubagentRunOutcome, withSubagentOutcomeTiming } from "./subagent-announce-output.js"; +import { ensureCompletionState, normalizeSubagentRunState } from "./subagent-delivery-state.js"; import { SUBAGENT_ENDED_OUTCOME_KILLED, SUBAGENT_ENDED_REASON_COMPLETE, @@ -74,9 +76,10 @@ export function markSubagentRunPausedAfterYield(params: { entry.cleanupHandled = false; mutated = true; } - if (entry.frozenResultText !== undefined) { - entry.frozenResultText = undefined; - entry.frozenResultCapturedAt = undefined; + const completion = ensureCompletionState(entry); + if (completion.resultText !== undefined) { + completion.resultText = undefined; + completion.capturedAt = undefined; mutated = true; } return mutated; @@ -368,6 +371,7 @@ export function createSubagentRunManager(params: { fallback?: SubagentRunRecord; runTimeoutSeconds?: number; preserveFrozenResultFallback?: boolean; + transcriptFile?: string; }) => { const previousRunId = replaceParams.previousRunId.trim(); const nextRunId = replaceParams.nextRunId.trim(); @@ -386,6 +390,12 @@ export function createSubagentRunManager(params: { if (shouldDeleteAttachments(source)) { void safeRemoveAttachmentsDir(source); } + if ( + source.execution?.transcriptFile && + source.execution.transcriptFile !== replaceParams.transcriptFile + ) { + void removeInternalSessionEffectsTranscript(source.execution.transcriptFile); + } params.runs.delete(previousRunId); params.resumedRuns.delete(previousRunId); } @@ -410,7 +420,8 @@ export function createSubagentRunManager(params: { typeof source.endedAt === "number" ? source.endedAt : now, ) ?? 0; - const next: SubagentRunRecord = { + const sourceCompletion = ensureCompletionState(source); + const next: SubagentRunRecord = normalizeSubagentRunState({ ...source, runId: nextRunId, createdAt: now, @@ -423,37 +434,26 @@ export function createSubagentRunManager(params: { endedHookEmittedAt: undefined, wakeOnDescendantSettle: undefined, outcome: undefined, - frozenResultText: undefined, - frozenResultCapturedAt: undefined, - fallbackFrozenResultText: preserveFrozenResultFallback ? source.frozenResultText : undefined, - fallbackFrozenResultCapturedAt: preserveFrozenResultFallback - ? source.frozenResultCapturedAt - : undefined, + execution: { + status: "running", + startedAt: now, + transcriptFile: replaceParams.transcriptFile, + }, + completion: { + required: source.expectsCompletionMessage === true, + fallbackResultText: preserveFrozenResultFallback ? sourceCompletion.resultText : undefined, + fallbackCapturedAt: preserveFrozenResultFallback ? sourceCompletion.capturedAt : undefined, + }, cleanupCompletedAt: undefined, cleanupHandled: false, - completionEnqueuedAt: undefined, - completionDeliveredAt: undefined, - completionAnnouncedAt: undefined, - lastAnnounceDropReason: undefined, suppressAnnounceReason: undefined, - announceRetryCount: undefined, - lastAnnounceRetryAt: undefined, - lastAnnounceDeliveryError: undefined, - pendingFinalDelivery: undefined, - pendingFinalDeliveryCreatedAt: undefined, - pendingFinalDeliveryLastAttemptAt: undefined, - pendingFinalDeliveryAttemptCount: undefined, - pendingFinalDeliveryLastError: undefined, - pendingFinalDeliveryPayload: undefined, - deliverySuspendedAt: undefined, - deliverySuspendedReason: undefined, - deliveryDiscardedAt: undefined, - deliveryDiscardReason: undefined, - deliveryDiscardedPayloadSummary: undefined, + delivery: { + status: source.expectsCompletionMessage === false ? "not_required" : "pending", + }, spawnMode, archiveAtMs, runTimeoutSeconds, - }; + }); params.runs.set(nextRunId, next); params.ensureListener(); @@ -485,7 +485,7 @@ export function createSubagentRunManager(params: { const runTimeoutSeconds = registerParams.runTimeoutSeconds ?? 0; const waitTimeoutMs = params.resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds); const requesterOrigin = normalizeDeliveryContext(registerParams.requesterOrigin); - const entry: SubagentRunRecord = { + const entry: SubagentRunRecord = normalizeSubagentRunState({ runId, childSessionKey, controllerSessionKey, @@ -504,16 +504,25 @@ export function createSubagentRunManager(params: { runTimeoutSeconds, createdAt: now, startedAt: now, + execution: { + status: "running", + startedAt: now, + }, + completion: { + required: registerParams.expectsCompletionMessage === true, + }, + delivery: { + status: registerParams.expectsCompletionMessage === false ? "not_required" : "pending", + }, sessionStartedAt: now, accumulatedRuntimeMs: 0, archiveAtMs, cleanupHandled: false, - completionAnnouncedAt: undefined, wakeOnDescendantSettle: undefined, attachmentsDir: registerParams.attachmentsDir, attachmentsRootDir: registerParams.attachmentsRootDir, retainAttachmentsOnKeep: registerParams.retainAttachmentsOnKeep, - }; + }); params.runs.set(runId, entry); try { params.persistOrThrow(); diff --git a/src/agents/subagent-registry-steer-runtime.ts b/src/agents/subagent-registry-steer-runtime.ts index ceb0284320d..c8b37761e48 100644 --- a/src/agents/subagent-registry-steer-runtime.ts +++ b/src/agents/subagent-registry-steer-runtime.ts @@ -6,6 +6,7 @@ type ReplaceSubagentRunAfterSteerParams = { fallback?: SubagentRunRecord; runTimeoutSeconds?: number; preserveFrozenResultFallback?: boolean; + transcriptFile?: string; }; type ReplaceSubagentRunAfterSteerFn = (params: ReplaceSubagentRunAfterSteerParams) => boolean; diff --git a/src/agents/subagent-registry.announce-loop-guard.test.ts b/src/agents/subagent-registry.announce-loop-guard.test.ts index 1a695aa3fc9..5fb222cf465 100644 --- a/src/agents/subagent-registry.announce-loop-guard.test.ts +++ b/src/agents/subagent-registry.announce-loop-guard.test.ts @@ -150,14 +150,13 @@ describe("announce loop guard (#18264)", () => { createdAt: now - 60_000, startedAt: now - 55_000, endedAt: now - 50_000, - announceRetryCount: 3, - lastAnnounceRetryAt: now - 10_000, + delivery: { status: "pending", attemptCount: 3, lastAttemptAt: now - 10_000 }, }); const runs = registry.listSubagentRunsForRequester("agent:main:main"); const entry = requireRunById(runs, "test-loop-guard"); - expect(entry.announceRetryCount).toBe(3); - expect(entry.lastAnnounceRetryAt).toBe(now - 10_000); + expect(entry.delivery?.attemptCount).toBe(3); + expect(entry.delivery?.lastAttemptAt).toBe(now - 10_000); }); test.each([ @@ -175,8 +174,7 @@ describe("announce loop guard (#18264)", () => { startedAt: now - 14 * 60_000, endedAt: now - 10 * 60_000, cleanupCompletedAt: undefined, - announceRetryCount: 3, - lastAnnounceRetryAt: now - 9 * 60_000, + delivery: { status: "pending" as const, attemptCount: 3, lastAttemptAt: now - 9 * 60_000 }, }), }, { @@ -192,8 +190,7 @@ describe("announce loop guard (#18264)", () => { startedAt: now - 90_000, endedAt: now - 60_000, cleanupCompletedAt: undefined, - announceRetryCount: 3, - lastAnnounceRetryAt: now - 30_000, + delivery: { status: "pending" as const, attemptCount: 3, lastAttemptAt: now - 30_000 }, }), }, ])("$name", async ({ createEntry }) => { @@ -278,9 +275,9 @@ describe("announce loop guard (#18264)", () => { const stored = await waitForRun( runId, - (run) => run.cleanupHandled === false && run.announceRetryCount === 1, + (run) => run.cleanupHandled === false && run.delivery?.attemptCount === 1, ); expect(stored.cleanupCompletedAt).toBeUndefined(); - expect(stored.lastAnnounceRetryAt).toBeTypeOf("number"); + expect(stored.delivery?.lastAttemptAt).toBeTypeOf("number"); }); }); diff --git a/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts b/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts index 75f34702934..c85c31c0ac2 100644 --- a/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts +++ b/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts @@ -249,7 +249,7 @@ describe("subagent registry lifecycle error grace", () => { const run = mod .listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY) .find((candidate) => candidate.runId === runId); - if (run?.frozenResultText === expectedText) { + if (run?.completion?.resultText === expectedText) { return run; } await vi.advanceTimersByTimeAsync(1); @@ -432,7 +432,7 @@ describe("subagent registry lifecycle error grace", () => { const runBeforeRefresh = mod .listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY) .find((candidate) => candidate.runId === "run-refresh"); - const firstCapturedAt = runBeforeRefresh?.frozenResultCapturedAt ?? 0; + const firstCapturedAt = runBeforeRefresh?.completion?.capturedAt ?? 0; setAssistantOutput( "agent:main:subagent:refresh", @@ -447,10 +447,10 @@ describe("subagent registry lifecycle error grace", () => { "run-refresh", "All 3 subagents complete. Here's the final summary.", ); - expect(runAfterRefresh?.frozenResultText).toBe( + expect(runAfterRefresh?.completion?.resultText).toBe( "All 3 subagents complete. Here's the final summary.", ); - expect((runAfterRefresh?.frozenResultCapturedAt ?? 0) >= firstCapturedAt).toBe(true); + expect((runAfterRefresh?.completion?.capturedAt ?? 0) >= firstCapturedAt).toBe(true); emitLifecycleEvent("run-refresh", { phase: "end", endedAt: endedAt + 300 }); await flushAsync(); @@ -484,7 +484,7 @@ describe("subagent registry lifecycle error grace", () => { const runAfterSilent = mod .listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY) .find((candidate) => candidate.runId === "run-refresh-silent"); - expect(runAfterSilent?.frozenResultText).toBe("All work complete, final summary"); + expect(runAfterSilent?.completion?.resultText).toBe("All work complete, final summary"); emitLifecycleEvent("run-refresh-silent", { phase: "end", endedAt: endedAt + 300 }); await flushAsync(); @@ -516,9 +516,11 @@ describe("subagent registry lifecycle error grace", () => { throw new Error("expected capped run to exist"); } expect(run.runId).toBe("run-capped"); - expect(typeof run.frozenResultText).toBe("string"); - expect(run.frozenResultText).toContain("[truncated: frozen completion output exceeded 100KB"); - expect(run.frozenResultCapturedAt).toBeTypeOf("number"); + expect(typeof run.completion?.resultText).toBe("string"); + expect(run.completion?.resultText).toContain( + "[truncated: frozen completion output exceeded 100KB", + ); + expect(run.completion?.capturedAt).toBeTypeOf("number"); }); it("keeps parallel child completion results frozen even when late traffic arrives", async () => { diff --git a/src/agents/subagent-registry.steer-restart.test.ts b/src/agents/subagent-registry.steer-restart.test.ts index 3ab9acb5767..52ab497c39b 100644 --- a/src/agents/subagent-registry.steer-restart.test.ts +++ b/src/agents/subagent-registry.steer-restart.test.ts @@ -67,6 +67,7 @@ vi.mock("../config/sessions.js", () => { const announceSpy = vi.fn(async (_params: unknown) => true); const runSubagentEndedHookMock = vi.fn(async (eventValue?: unknown, _ctx?: unknown) => {}); const emitSessionLifecycleEventMock = vi.fn(); +const removeInternalSessionEffectsTranscriptMock = vi.fn(async (_sessionFile?: string) => {}); function countMatching(items: readonly T[], predicate: (item: T) => boolean) { let count = 0; @@ -153,6 +154,10 @@ vi.mock("./subagent-registry.store.js", () => ({ saveSubagentRegistryToDisk: vi.fn(() => {}), })); +vi.mock("./internal-session-effects.js", () => ({ + removeInternalSessionEffectsTranscript: removeInternalSessionEffectsTranscriptMock, +})); + describe("subagent registry steer restarts", () => { let mod: typeof import("./subagent-registry.js"); type RegisterSubagentRunInput = Parameters[0]; @@ -176,6 +181,7 @@ describe("subagent registry steer restarts", () => { runSubagentEndedHookMock.mockReset(); runSubagentEndedHookMock.mockImplementation(async () => {}); emitSessionLifecycleEventMock.mockReset(); + removeInternalSessionEffectsTranscriptMock.mockClear(); mod.resetSubagentRegistryForTests({ persist: false }); }); @@ -271,11 +277,13 @@ describe("subagent registry steer restarts", () => { previousRunId: string; nextRunId: string; fallback?: ReturnType[number]; + transcriptFile?: string; }) => { const replaced = mod.replaceSubagentRunAfterSteer({ previousRunId: params.previousRunId, nextRunId: params.nextRunId, fallback: params.fallback, + transcriptFile: params.transcriptFile, }); expect(replaced).toBe(true); @@ -294,6 +302,7 @@ describe("subagent registry steer restarts", () => { runSubagentEndedHookMock.mockImplementation(async () => {}); emitSessionLifecycleEventMock.mockReset(); lifecycleHandler = undefined; + removeInternalSessionEffectsTranscriptMock.mockClear(); mod.resetSubagentRegistryForTests({ persist: false }); }); @@ -345,6 +354,37 @@ describe("subagent registry steer restarts", () => { } }); + it("removes orphaned private transcript when steer replaces an internally resumed run", async () => { + { + registerRun({ + runId: "run-old", + childSessionKey: "agent:main:subagent:steer", + task: "initial task", + }); + + const previous = listMainRuns()[0]; + expect(previous?.runId).toBe("run-old"); + if (!previous) { + throw new Error("expected registered subagent run"); + } + previous.execution = { + status: "interrupted", + startedAt: previous.startedAt, + transcriptFile: "/tmp/openclaw-state/internal-agent-runs/run-old.jsonl", + }; + + replaceRunAfterSteer({ + previousRunId: "run-old", + nextRunId: "run-new", + fallback: previous, + }); + + expect(removeInternalSessionEffectsTranscriptMock).toHaveBeenCalledWith( + "/tmp/openclaw-state/internal-agent-runs/run-old.jsonl", + ); + } + }); + it("defers subagent_ended hook for completion-mode runs until announce delivery resolves", async () => { { const resolveAnnounce = createDeferredAnnounceResolver(); @@ -411,8 +451,7 @@ describe("subagent registry steer restarts", () => { const previous = listMainRuns()[0]; expect(previous?.runId).toBe("run-retry-reset-old"); if (previous) { - previous.announceRetryCount = 2; - previous.lastAnnounceRetryAt = Date.now(); + previous.delivery = { status: "pending", attemptCount: 2, lastAttemptAt: Date.now() }; } const run = replaceRunAfterSteer({ @@ -420,8 +459,8 @@ describe("subagent registry steer restarts", () => { nextRunId: "run-retry-reset-new", fallback: previous, }); - expect(run.announceRetryCount).toBeUndefined(); - expect(run.lastAnnounceRetryAt).toBeUndefined(); + expect(run.delivery?.attemptCount).toBeUndefined(); + expect(run.delivery?.lastAttemptAt).toBeUndefined(); } }); @@ -473,8 +512,11 @@ describe("subagent registry steer restarts", () => { const previous = listMainRuns()[0]; expect(previous?.runId).toBe("run-frozen-old"); if (previous) { - previous.frozenResultText = "stale frozen completion"; - previous.frozenResultCapturedAt = Date.now(); + previous.completion = { + required: true, + resultText: "stale frozen completion", + capturedAt: Date.now(), + }; previous.cleanupCompletedAt = Date.now(); previous.cleanupHandled = true; } @@ -485,8 +527,8 @@ describe("subagent registry steer restarts", () => { fallback: previous, }); - expect(run.frozenResultText).toBeUndefined(); - expect(run.frozenResultCapturedAt).toBeUndefined(); + expect(run.completion?.resultText).toBeUndefined(); + expect(run.completion?.capturedAt).toBeUndefined(); expect(run.cleanupCompletedAt).toBeUndefined(); expect(run.cleanupHandled).toBe(false); }); @@ -543,10 +585,13 @@ describe("subagent registry steer restarts", () => { if (!previous) { throw new Error("missing previous run"); } - previous.completionEnqueuedAt = 1_000; - previous.completionDeliveredAt = 2_000; - previous.completionAnnouncedAt = 2_000; - previous.lastAnnounceDropReason = "sink_unavailable"; + previous.delivery = { + status: "delivered", + enqueuedAt: 1_000, + deliveredAt: 2_000, + announcedAt: 2_000, + lastDropReason: "sink_unavailable", + }; const replaced = mod.replaceSubagentRunAfterSteer({ previousRunId: "run-delivery-old", @@ -559,10 +604,10 @@ describe("subagent registry steer restarts", () => { if (!next) { throw new Error("expected replacement run"); } - expect(next.completionEnqueuedAt).toBeUndefined(); - expect(next.completionDeliveredAt).toBeUndefined(); - expect(next.completionAnnouncedAt).toBeUndefined(); - expect(next.lastAnnounceDropReason).toBeUndefined(); + expect(next.delivery?.enqueuedAt).toBeUndefined(); + expect(next.delivery?.deliveredAt).toBeUndefined(); + expect(next.delivery?.announcedAt).toBeUndefined(); + expect(next.delivery?.lastDropReason).toBeUndefined(); }); it("preserves frozen completion as fallback when replacing for wake continuation", () => { @@ -575,8 +620,11 @@ describe("subagent registry steer restarts", () => { const previous = listMainRuns()[0]; expect(previous?.runId).toBe("run-wake-old"); if (previous) { - previous.frozenResultText = "final summary before wake"; - previous.frozenResultCapturedAt = 1234; + previous.completion = { + required: true, + resultText: "final summary before wake", + capturedAt: 1234, + }; } const replaced = mod.replaceSubagentRunAfterSteer({ @@ -591,9 +639,9 @@ describe("subagent registry steer restarts", () => { if (!run) { throw new Error("expected wake replacement run"); } - expect(run.frozenResultText).toBeUndefined(); - expect(run.fallbackFrozenResultText).toBe("final summary before wake"); - expect(run.fallbackFrozenResultCapturedAt).toBe(1234); + expect(run.completion?.resultText).toBeUndefined(); + expect(run.completion?.fallbackResultText).toBe("final summary before wake"); + expect(run.completion?.fallbackCapturedAt).toBe(1234); }); it("restores announce for a finished run when steer replacement dispatch fails", async () => { @@ -786,27 +834,27 @@ describe("subagent registry steer restarts", () => { await vi.advanceTimersByTimeAsync(0); expect(announceSpy).toHaveBeenCalledTimes(1); - expect(listMainRuns()[0]?.announceRetryCount).toBe(1); + expect(listMainRuns()[0]?.delivery?.attemptCount).toBe(1); await vi.advanceTimersByTimeAsync(999); expect(announceSpy).toHaveBeenCalledTimes(1); await vi.advanceTimersByTimeAsync(1); expect(announceSpy).toHaveBeenCalledTimes(2); - expect(listMainRuns()[0]?.announceRetryCount).toBe(2); + expect(listMainRuns()[0]?.delivery?.attemptCount).toBe(2); await vi.advanceTimersByTimeAsync(1_999); expect(announceSpy).toHaveBeenCalledTimes(2); await vi.advanceTimersByTimeAsync(1); expect(announceSpy).toHaveBeenCalledTimes(3); - expect(listMainRuns()[0]?.announceRetryCount).toBe(3); + expect(listMainRuns()[0]?.delivery?.attemptCount).toBe(3); await vi.advanceTimersByTimeAsync(4_001); expect(announceSpy).toHaveBeenCalledTimes(3); await waitForRegistrySideEffect(() => { const run = listMainRuns()[0]; - expect(run?.pendingFinalDelivery).toBe(true); - expect(run?.deliverySuspendedAt).toBeTypeOf("number"); - expect(run?.deliverySuspendedReason).toBe("retry-limit"); + expect(run?.delivery?.status).toBe("suspended"); + expect(run?.delivery?.suspendedAt).toBeTypeOf("number"); + expect(run?.delivery?.suspendedReason).toBe("retry-limit"); expect(run?.cleanupCompletedAt).toBeUndefined(); }); } finally { diff --git a/src/agents/subagent-registry.store.ts b/src/agents/subagent-registry.store.ts index 38fce68e6d9..a317ef738dd 100644 --- a/src/agents/subagent-registry.store.ts +++ b/src/agents/subagent-registry.store.ts @@ -5,6 +5,7 @@ import { resolveStateDir } from "../config/paths.js"; import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; import { readStringValue } from "../shared/string-coerce.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; +import { normalizeSubagentRunState } from "./subagent-delivery-state.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; type PersistedSubagentRegistryV1 = { @@ -148,16 +149,19 @@ export function loadSubagentRegistryFromDisk(): Map { requesterAccountId: _accountId, ...rest } = typed; - out.set(runId, { - ...rest, - childSessionKey, - requesterSessionKey, - controllerSessionKey, - requesterOrigin, - cleanupCompletedAt, - cleanupHandled, - spawnMode: typed.spawnMode === "session" ? "session" : "run", - }); + out.set( + runId, + normalizeSubagentRunState({ + ...rest, + childSessionKey, + requesterSessionKey, + controllerSessionKey, + requesterOrigin, + cleanupCompletedAt, + cleanupHandled, + spawnMode: typed.spawnMode === "session" ? "session" : "run", + }), + ); if (isLegacy) { migrated = true; } @@ -178,7 +182,7 @@ export function saveSubagentRegistryToDisk(runs: Map) const pathname = resolveSubagentRegistryPath(); const serialized: Record = {}; for (const [runId, entry] of runs.entries()) { - serialized[runId] = entry; + serialized[runId] = normalizeSubagentRunState(cloneSubagentRunRecord(entry)); } const out: PersistedSubagentRegistry = { version: REGISTRY_VERSION, diff --git a/src/agents/subagent-registry.test.ts b/src/agents/subagent-registry.test.ts index 6479ca75688..de9f318e2cf 100644 --- a/src/agents/subagent-registry.test.ts +++ b/src/agents/subagent-registry.test.ts @@ -252,8 +252,8 @@ describe("subagent registry seam flow", () => { expectsCompletionMessage: true, createdAt: now - 2, endedAt: now - 1, - pendingFinalDelivery: true, - frozenResultText: "child output", + completion: { required: true, resultText: "child output" }, + delivery: { status: "pending" }, }); mod.addSubagentRunForTests({ runId: "run-complete", @@ -265,7 +265,7 @@ describe("subagent registry seam flow", () => { expectsCompletionMessage: true, createdAt: now - 4, endedAt: now - 3, - completionAnnouncedAt: now - 2, + delivery: { status: "delivered", announcedAt: now - 2, deliveredAt: now - 2 }, cleanupCompletedAt: now - 1, }); @@ -1098,8 +1098,11 @@ describe("subagent registry seam flow", () => { startedAt: Date.parse("2026-03-24T11:59:00Z"), endedAt: Date.parse("2026-03-24T11:59:30Z"), expectsCompletionMessage: true, - announceRetryCount: 3, - lastAnnounceRetryAt: Date.parse("2026-03-24T11:59:40Z"), + delivery: { + status: "pending", + attemptCount: 3, + lastAttemptAt: Date.parse("2026-03-24T11:59:40Z"), + }, }); return 1; }) as never); @@ -1144,21 +1147,23 @@ describe("subagent registry seam flow", () => { endedReason: "subagent-complete", expectsCompletionMessage: true, outcome: { status: "ok" }, - announceRetryCount: 3, - lastAnnounceRetryAt: Date.parse("2026-03-24T11:59:40Z"), - lastAnnounceDeliveryError: "gateway request timeout for agent", - frozenResultText: "child completed successfully", - pendingFinalDelivery: true, - pendingFinalDeliveryPayload: { - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - childSessionKey: "agent:main:subagent:child", - childRunId: "run-resume-keep", - task: "resume keep retry budget", - endedAt: Date.parse("2026-03-24T11:59:30Z"), - outcome: { status: "ok" }, - expectsCompletionMessage: true, - frozenResultText: "child completed successfully", + completion: { required: true, resultText: "child completed successfully" }, + delivery: { + status: "pending", + attemptCount: 3, + lastAttemptAt: Date.parse("2026-03-24T11:59:40Z"), + lastError: "gateway request timeout for agent", + payload: { + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + childSessionKey: "agent:main:subagent:child", + childRunId: "run-resume-keep", + task: "resume keep retry budget", + endedAt: Date.parse("2026-03-24T11:59:30Z"), + outcome: { status: "ok" }, + expectsCompletionMessage: true, + frozenResultText: "child completed successfully", + }, }, }); return 1; @@ -1173,12 +1178,14 @@ describe("subagent registry seam flow", () => { .listSubagentRunsForRequester("agent:main:main") .find((entry) => entry.runId === "run-resume-keep"); expect(run).toMatchObject({ - pendingFinalDelivery: true, - deliverySuspendedReason: "retry-limit", + delivery: { + status: "suspended", + suspendedReason: "retry-limit", + }, cleanupHandled: false, }); expect(run?.cleanupCompletedAt).toBeUndefined(); - expect(run?.pendingFinalDeliveryPayload).toMatchObject({ + expect(run?.delivery?.payload).toMatchObject({ childRunId: "run-resume-keep", frozenResultText: "child completed successfully", }); @@ -1199,27 +1206,26 @@ describe("subagent registry seam flow", () => { endedAt, endedReason: "subagent-complete", outcome: { status: "ok" }, - announceRetryCount: 3, - lastAnnounceRetryAt: endedAt + 1_000, - lastAnnounceDeliveryError: "gateway request timeout for agent", - pendingFinalDelivery: true, - pendingFinalDeliveryCreatedAt: endedAt + 1_000, - pendingFinalDeliveryLastAttemptAt: endedAt + 2_000, - pendingFinalDeliveryAttemptCount: 3, - pendingFinalDeliveryLastError: "gateway request timeout for agent", - pendingFinalDeliveryPayload: { - requesterSessionKey: "agent:main:main", - requesterDisplayKey: "main", - childSessionKey: "agent:main:subagent:reactivated", - childRunId: "run-suspended-old", - task: "reactivate suspended delivery", - endedAt, - outcome: { status: "ok" }, - expectsCompletionMessage: true, - frozenResultText: "child completed successfully", + delivery: { + status: "suspended", + createdAt: endedAt + 1_000, + lastAttemptAt: endedAt + 2_000, + attemptCount: 3, + lastError: "gateway request timeout for agent", + payload: { + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + childSessionKey: "agent:main:subagent:reactivated", + childRunId: "run-suspended-old", + task: "reactivate suspended delivery", + endedAt, + outcome: { status: "ok" }, + expectsCompletionMessage: true, + frozenResultText: "child completed successfully", + }, + suspendedAt: endedAt + 3_000, + suspendedReason: "retry-limit", }, - deliverySuspendedAt: endedAt + 3_000, - deliverySuspendedReason: "retry-limit", }); expect( @@ -1238,11 +1244,10 @@ describe("subagent registry seam flow", () => { cleanupHandled: false, }); expect(replacement?.endedAt).toBeUndefined(); - expect(replacement?.lastAnnounceDeliveryError).toBeUndefined(); - expect(replacement?.pendingFinalDelivery).toBeUndefined(); - expect(replacement?.pendingFinalDeliveryPayload).toBeUndefined(); - expect(replacement?.deliverySuspendedAt).toBeUndefined(); - expect(replacement?.deliverySuspendedReason).toBeUndefined(); + expect(replacement?.delivery?.lastError).toBeUndefined(); + expect(replacement?.delivery?.payload).toBeUndefined(); + expect(replacement?.delivery?.suspendedAt).toBeUndefined(); + expect(replacement?.delivery?.suspendedReason).toBeUndefined(); }); it("finalizes expired delete-mode parents when descendant cleanup retriggers deferred announce handling", async () => { @@ -1699,25 +1704,26 @@ describe("subagent registry seam flow", () => { startedAt: now - 3 * 60 * 60_000, endedAt: now - 3 * 60 * 60_000, outcome: { status: "ok" }, - pendingFinalDelivery: true, - pendingFinalDeliveryCreatedAt: now - 3 * 60 * 60_000, - pendingFinalDeliveryLastAttemptAt: now - 2 * 60 * 60_000 - 1, - pendingFinalDeliveryAttemptCount: 3, - pendingFinalDeliveryLastError: "gateway request timeout for agent", - pendingFinalDeliveryPayload: { - requesterSessionKey: "agent:main:cron:cron-1:run:parent", - requesterDisplayKey: "cron", - childSessionKey: "agent:main:subagent:suspended-cron", - childRunId: runId, - task: "cron suspended delivery", - endedAt: now - 3 * 60 * 60_000, - outcome: { status: "ok" }, - expectsCompletionMessage: true, - frozenResultText: "large final payload", + delivery: { + status: "suspended", + createdAt: now - 3 * 60 * 60_000, + lastAttemptAt: now - 2 * 60 * 60_000 - 1, + attemptCount: 3, + lastError: "gateway request timeout for agent", + payload: { + requesterSessionKey: "agent:main:cron:cron-1:run:parent", + requesterDisplayKey: "cron", + childSessionKey: "agent:main:subagent:suspended-cron", + childRunId: runId, + task: "cron suspended delivery", + endedAt: now - 3 * 60 * 60_000, + outcome: { status: "ok" }, + expectsCompletionMessage: true, + frozenResultText: "large final payload", + }, + suspendedAt: now - 2 * 60 * 60_000 - 1, + suspendedReason: "retry-limit", }, - deliverySuspendedAt: now - 2 * 60 * 60_000 - 1, - deliverySuspendedReason: "retry-limit", - lastAnnounceDeliveryError: "gateway request timeout for agent", }); await mod.testing.sweepOnceForTests(); @@ -1725,16 +1731,18 @@ describe("subagent registry seam flow", () => { const run = mod.getSubagentRunByChildSessionKey("agent:main:subagent:suspended-cron"); expect(run).toMatchObject({ runId, - pendingFinalDelivery: undefined, - pendingFinalDeliveryPayload: undefined, - deliverySuspendedAt: undefined, - deliverySuspendedReason: undefined, - deliveryDiscardedAt: now, - deliveryDiscardReason: "expired", + delivery: { + status: "discarded", + payload: undefined, + suspendedAt: undefined, + suspendedReason: undefined, + discardedAt: now, + discardReason: "expired", + }, cleanupHandled: true, cleanupCompletedAt: now, }); - expect(run?.deliveryDiscardedPayloadSummary).toEqual({ + expect(run?.delivery?.discardedPayloadSummary).toEqual({ requesterSessionKey: "agent:main:cron:cron-1:run:parent", childSessionKey: "agent:main:subagent:suspended-cron", childRunId: runId, @@ -1770,24 +1778,26 @@ describe("subagent registry seam flow", () => { startedAt: now - 60_000, endedAt: now - 60_000, outcome: { status: "ok" }, - pendingFinalDelivery: true, - pendingFinalDeliveryCreatedAt: now - 60_000, - pendingFinalDeliveryLastAttemptAt: now - 60_000 + i, - pendingFinalDeliveryAttemptCount: 3, - pendingFinalDeliveryLastError: "gateway request timeout for agent", - pendingFinalDeliveryPayload: { - requesterSessionKey: "agent:main:telegram:direct:418181497", - requesterDisplayKey: "telegram", - childSessionKey: `agent:main:subagent:suspended-pressure-${i}`, - childRunId: runId, - task: "interactive suspended delivery", - endedAt: now - 60_000, - outcome: { status: "ok" }, - expectsCompletionMessage: true, - frozenResultText: "final payload", + delivery: { + status: "suspended", + createdAt: now - 60_000, + lastAttemptAt: now - 60_000 + i, + attemptCount: 3, + lastError: "gateway request timeout for agent", + payload: { + requesterSessionKey: "agent:main:telegram:direct:418181497", + requesterDisplayKey: "telegram", + childSessionKey: `agent:main:subagent:suspended-pressure-${i}`, + childRunId: runId, + task: "interactive suspended delivery", + endedAt: now - 60_000, + outcome: { status: "ok" }, + expectsCompletionMessage: true, + frozenResultText: "final payload", + }, + suspendedAt: now - 60_000 + i, + suspendedReason: "retry-limit", }, - deliverySuspendedAt: now - 60_000 + i, - deliverySuspendedReason: "retry-limit", }); } @@ -1796,15 +1806,16 @@ describe("subagent registry seam flow", () => { const runs = Array.from({ length: 51 }, (_, i) => mod.getSubagentRunByChildSessionKey(`agent:main:subagent:suspended-pressure-${i}`), ); - const discarded = runs.filter((run) => run?.deliveryDiscardReason === "pressure-pruned"); + const discarded = runs.filter((run) => run?.delivery?.discardReason === "pressure-pruned"); const stillSuspended = runs.filter( - (run) => run?.pendingFinalDelivery === true && typeof run.deliverySuspendedAt === "number", + (run) => + run?.delivery?.status === "suspended" && typeof run.delivery.suspendedAt === "number", ); expect(discarded).toHaveLength(41); expect(stillSuspended).toHaveLength(10); expect(discarded[0]?.runId).toBe("run-suspended-pressure-0"); - expect(runs[40]?.deliveryDiscardReason).toBe("pressure-pruned"); - expect(runs[41]?.pendingFinalDelivery).toBe(true); + expect(runs[40]?.delivery?.discardReason).toBe("pressure-pruned"); + expect(runs[41]?.delivery?.status).toBe("suspended"); expect(mocks.persistSubagentRunsToDisk).toHaveBeenCalled(); }); }); diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 7473af82936..95a32b299f5 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -11,7 +11,16 @@ import { createLazyImportLoader, createLazyPromiseLoader } from "../shared/lazy- import { importRuntimeModule } from "../shared/runtime-import.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; +import { removeInternalSessionEffectsTranscript } from "./internal-session-effects.js"; import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js"; +import { + ensureCompletionState, + ensureDeliveryState, + getDeliveryAttemptCount, + getDeliveryLastAttemptAt, + getDeliveryLastError, + isDeliverySuspended, +} from "./subagent-delivery-state.js"; import { SUBAGENT_ENDED_REASON_COMPLETE, SUBAGENT_ENDED_REASON_ERROR, @@ -517,18 +526,14 @@ function resumeSubagentRun(runId: string) { if (entry.cleanupCompletedAt) { return; } - if ( - typeof entry.endedAt === "number" && - entry.pendingFinalDelivery === true && - typeof entry.deliverySuspendedAt === "number" - ) { + if (typeof entry.endedAt === "number" && isDeliverySuspended(entry)) { return; } if (entry.pauseReason === "sessions_yield") { return; } // Skip entries that have exhausted their retry budget or expired (#18264). - if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) { + if (getDeliveryAttemptCount(entry) >= MAX_ANNOUNCE_RETRY_COUNT) { void finalizeResumedAnnounceGiveUp({ runId, entry, @@ -550,13 +555,10 @@ function resumeSubagentRun(runId: string) { } const now = Date.now(); - const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0); - const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs; - if ( - entry.expectsCompletionMessage === true && - entry.lastAnnounceRetryAt && - now < earliestRetryAt - ) { + const lastAttemptAt = getDeliveryLastAttemptAt(entry); + const delayMs = resolveAnnounceRetryDelayMs(getDeliveryAttemptCount(entry)); + const earliestRetryAt = (lastAttemptAt ?? 0) + delayMs; + if (entry.expectsCompletionMessage === true && lastAttemptAt && now < earliestRetryAt) { const waitMs = Math.max(1, earliestRetryAt - now); const scheduledEntry = entry; const timer = setTimeout(() => { @@ -677,11 +679,7 @@ function stopSweeper() { } function isSuspendedPendingFinalDelivery(entry: SubagentRunRecord): boolean { - return ( - typeof entry.endedAt === "number" && - entry.pendingFinalDelivery === true && - typeof entry.deliverySuspendedAt === "number" - ); + return typeof entry.endedAt === "number" && isDeliverySuspended(entry); } function resolveSuspendedDeliveryExpiryMs(entry: SubagentRunRecord): number { @@ -701,30 +699,32 @@ async function discardSuspendedPendingFinalDelivery( now: number, reason: "expired" | "pressure-pruned", ): Promise { - const payload = entry.pendingFinalDeliveryPayload; - entry.deliveryDiscardedAt = now; - entry.deliveryDiscardReason = reason; - entry.deliveryDiscardedPayloadSummary = { + const delivery = ensureDeliveryState(entry); + const payload = delivery.payload; + delivery.status = "discarded"; + delivery.discardedAt = now; + delivery.discardReason = reason; + delivery.discardedPayloadSummary = { requesterSessionKey: payload?.requesterSessionKey ?? entry.requesterSessionKey, childSessionKey: payload?.childSessionKey ?? entry.childSessionKey, childRunId: payload?.childRunId ?? entry.runId, endedAt: payload?.endedAt ?? entry.endedAt, status: payload?.outcome?.status ?? entry.outcome?.status, - lastError: entry.lastAnnounceDeliveryError ?? entry.pendingFinalDeliveryLastError ?? null, + lastError: getDeliveryLastError(entry) ?? null, }; - entry.pendingFinalDelivery = undefined; - entry.pendingFinalDeliveryCreatedAt = undefined; - entry.pendingFinalDeliveryLastAttemptAt = undefined; - entry.pendingFinalDeliveryAttemptCount = undefined; - entry.pendingFinalDeliveryLastError = undefined; - entry.pendingFinalDeliveryPayload = undefined; - entry.deliverySuspendedAt = undefined; - entry.deliverySuspendedReason = undefined; + delivery.payload = undefined; + delivery.createdAt = undefined; + delivery.lastAttemptAt = undefined; + delivery.attemptCount = undefined; + delivery.lastError = undefined; + delivery.suspendedAt = undefined; + delivery.suspendedReason = undefined; entry.wakeOnDescendantSettle = undefined; - entry.fallbackFrozenResultText = undefined; - entry.fallbackFrozenResultCapturedAt = undefined; + const completion = ensureCompletionState(entry); + completion.fallbackResultText = undefined; + completion.fallbackCapturedAt = undefined; entry.cleanupHandled = true; - entry.completionAnnouncedAt = undefined; + delivery.announcedAt = undefined; resumedRuns.delete(runId); clearPendingLifecycleError(runId); clearPendingLifecycleTimeout(runId); @@ -738,6 +738,7 @@ async function discardSuspendedPendingFinalDelivery( if (shouldDeleteAttachments) { await safeRemoveAttachmentsDir(entry); } + await removeInternalSessionEffectsTranscript(entry.execution?.transcriptFile); const completionReason = entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE; completeCleanupBookkeeping({ runId, @@ -779,7 +780,7 @@ async function sweepSubagentRuns() { suspendedEntries.length - SUSPENDED_DELIVERY_PRESSURE_TARGET, ); for (const [runId] of suspendedEntries - .toSorted((a, b) => (a[1].deliverySuspendedAt ?? 0) - (b[1].deliverySuspendedAt ?? 0)) + .toSorted((a, b) => (a[1].delivery?.suspendedAt ?? 0) - (b[1].delivery?.suspendedAt ?? 0)) .slice(0, pressureCount)) { pressureDiscardRunIds.add(runId); } @@ -793,7 +794,7 @@ async function sweepSubagentRuns() { } for (const [runId, entry] of subagentRuns.entries()) { if (isSuspendedPendingFinalDelivery(entry)) { - const suspendedAgeMs = now - (entry.deliverySuspendedAt ?? now); + const suspendedAgeMs = now - (entry.delivery?.suspendedAt ?? now); const expired = suspendedAgeMs >= resolveSuspendedDeliveryExpiryMs(entry); if (expired || pressureDiscardRunIds.has(runId)) { await discardSuspendedPendingFinalDelivery( @@ -1093,6 +1094,7 @@ export function replaceSubagentRunAfterSteer(params: { fallback?: SubagentRunRecord; runTimeoutSeconds?: number; preserveFrozenResultFallback?: boolean; + transcriptFile?: string; }) { return subagentRunManager.replaceSubagentRunAfterSteer(params); } diff --git a/src/agents/subagent-registry.types.ts b/src/agents/subagent-registry.types.ts index ada495612e2..54b82f55376 100644 --- a/src/agents/subagent-registry.types.ts +++ b/src/agents/subagent-registry.types.ts @@ -21,6 +21,56 @@ export type PendingFinalDeliveryPayload = { wakeOnDescendantSettle?: boolean; }; +export type SubagentExecutionState = { + status: "running" | "interrupted" | "terminal"; + startedAt?: number; + endedAt?: number; + outcome?: SubagentRunOutcome; + interruptedAt?: number; + interruptionReason?: "gateway-restart" | "lost-execution-context"; + transcriptFile?: string; +}; + +export type SubagentCompletionState = { + required: boolean; + resultText?: string | null; + capturedAt?: number; + fallbackResultText?: string | null; + fallbackCapturedAt?: number; +}; + +export type SubagentCompletionDeliveryState = { + status: + | "not_required" + | "pending" + | "in_progress" + | "delivered" + | "failed" + | "suspended" + | "discarded"; + payload?: PendingFinalDeliveryPayload; + createdAt?: number; + enqueuedAt?: number; + deliveredAt?: number; + announcedAt?: number; + lastAttemptAt?: number; + attemptCount?: number; + lastError?: string | null; + suspendedAt?: number; + suspendedReason?: "retry-limit" | "expiry"; + discardedAt?: number; + discardReason?: "expired" | "pressure-pruned"; + discardedPayloadSummary?: { + requesterSessionKey?: string; + childSessionKey?: string; + childRunId?: string; + endedAt?: number; + status?: string; + lastError?: string | null; + }; + lastDropReason?: "queue_cap" | "parent_run_ended" | "sink_unavailable" | "dedupe"; +}; + export type SubagentRunRecord = { runId: string; childSessionKey: string; @@ -48,41 +98,15 @@ export type SubagentRunRecord = { cleanupHandled?: boolean; suppressAnnounceReason?: "steer-restart" | "killed"; expectsCompletionMessage?: boolean; - announceRetryCount?: number; - lastAnnounceRetryAt?: number; - lastAnnounceDeliveryError?: string; endedReason?: SubagentLifecycleEndedReason; pauseReason?: "sessions_yield"; wakeOnDescendantSettle?: boolean; - frozenResultText?: string | null; - frozenResultCapturedAt?: number; - fallbackFrozenResultText?: string | null; - fallbackFrozenResultCapturedAt?: number; + execution?: SubagentExecutionState; + completion?: SubagentCompletionState; /** Set after the subagent_ended hook has been emitted successfully once. */ endedHookEmittedAt?: number; - /** Durable marker that final user delivery still needs a retry/resume pass. */ - pendingFinalDelivery?: boolean; - pendingFinalDeliveryCreatedAt?: number; - pendingFinalDeliveryLastAttemptAt?: number; - pendingFinalDeliveryAttemptCount?: number; - pendingFinalDeliveryLastError?: string | null; - pendingFinalDeliveryPayload?: PendingFinalDeliveryPayload; - deliverySuspendedAt?: number; - deliverySuspendedReason?: "retry-limit" | "expiry"; - deliveryDiscardedAt?: number; - deliveryDiscardReason?: "expired" | "pressure-pruned"; - deliveryDiscardedPayloadSummary?: { - requesterSessionKey?: string; - childSessionKey?: string; - childRunId?: string; - endedAt?: number; - status?: string; - lastError?: string | null; - }; - completionEnqueuedAt?: number; - completionDeliveredAt?: number; - completionAnnouncedAt?: number; - lastAnnounceDropReason?: "queue_cap" | "parent_run_ended" | "sink_unavailable" | "dedupe"; + /** Durable outbox marker for parent/external completion delivery. */ + delivery?: SubagentCompletionDeliveryState; attachmentsDir?: string; attachmentsRootDir?: string; retainAttachmentsOnKeep?: boolean; diff --git a/src/cron/isolated-agent/subagent-followup.test.ts b/src/cron/isolated-agent/subagent-followup.test.ts index 1be6cd40cf1..98672c8737c 100644 --- a/src/cron/isolated-agent/subagent-followup.test.ts +++ b/src/cron/isolated-agent/subagent-followup.test.ts @@ -45,7 +45,8 @@ function createDescendantRun(params?: { task?: string; cleanup?: "keep" | "delete"; endedAt?: number; - frozenResultText?: string | null; + resultText?: string | null; + executionTranscriptFile?: string; }) { return { runId: params?.runId ?? "run-1", @@ -56,9 +57,17 @@ function createDescendantRun(params?: { cleanup: params?.cleanup ?? "keep", createdAt: 1000, endedAt: params?.endedAt ?? 2000, - ...(params?.frozenResultText === undefined + ...(params?.resultText === undefined ? {} - : { frozenResultText: params.frozenResultText }), + : { completion: { required: true, resultText: params.resultText } }), + ...(params?.executionTranscriptFile + ? { + execution: { + status: "terminal" as const, + transcriptFile: params.executionTranscriptFile, + }, + } + : {}), }; } @@ -127,7 +136,7 @@ describe("readDescendantSubagentFallbackReply", () => { vi.mocked(listDescendantRunsForRequester).mockReturnValue([ createDescendantRun({ cleanup: "delete", - frozenResultText: "frozen child output", + resultText: "frozen child output", }), ]); vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined); @@ -140,7 +149,7 @@ describe("readDescendantSubagentFallbackReply", () => { it("prefers session transcript over frozenResultText", async () => { vi.mocked(listDescendantRunsForRequester).mockReturnValue([ - createDescendantRun({ frozenResultText: "frozen text" }), + createDescendantRun({ resultText: "frozen text" }), ]); vi.mocked(readLatestAssistantReply).mockResolvedValue("live transcript text"); const result = await readDescendantSubagentFallbackReply({ @@ -150,15 +159,47 @@ describe("readDescendantSubagentFallbackReply", () => { expect(result).toBe("live transcript text"); }); + it("prefers captured completion for internally resumed descendants", async () => { + vi.mocked(listDescendantRunsForRequester).mockReturnValue([ + createDescendantRun({ + resultText: "fresh recovered output", + executionTranscriptFile: "/tmp/openclaw-internal-run.jsonl", + }), + ]); + vi.mocked(readLatestAssistantReply).mockResolvedValue("stale visible transcript"); + const result = await readDescendantSubagentFallbackReply({ + sessionKey: "test-session", + runStartedAt, + }); + expect(result).toBe("fresh recovered output"); + }); + + it("does not fall back to visible transcript for internally resumed descendants without captured output", async () => { + vi.mocked(listDescendantRunsForRequester).mockReturnValue([ + createDescendantRun({ + resultText: null, + executionTranscriptFile: "/tmp/openclaw-empty-internal-run.jsonl", + }), + ]); + vi.mocked(readLatestAssistantReply).mockClear(); + vi.mocked(readLatestAssistantReply).mockResolvedValue("stale visible transcript"); + const result = await readDescendantSubagentFallbackReply({ + sessionKey: "test-session", + runStartedAt, + }); + expect(result).toBeUndefined(); + expect(readLatestAssistantReply).not.toHaveBeenCalled(); + }); + it("joins replies from multiple descendants", async () => { vi.mocked(listDescendantRunsForRequester).mockReturnValue([ - createDescendantRun({ frozenResultText: "first child output" }), + createDescendantRun({ resultText: "first child output" }), createDescendantRun({ runId: "run-2", childSessionKey: "child-2", task: "task-2", endedAt: 3000, - frozenResultText: "second child output", + resultText: "second child output", }), ]); vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined); @@ -177,7 +218,7 @@ describe("readDescendantSubagentFallbackReply", () => { childSessionKey: "child-2", task: "task-2", endedAt: 3000, - frozenResultText: "useful output", + resultText: "useful output", }), ]); vi.mocked(readLatestAssistantReply).mockImplementation(async (params) => { @@ -193,11 +234,11 @@ describe("readDescendantSubagentFallbackReply", () => { expect(result).toBe("useful output"); }); - it("returns undefined when frozenResultText is null", async () => { + it("returns undefined when completion result is null", async () => { vi.mocked(listDescendantRunsForRequester).mockReturnValue([ createDescendantRun({ cleanup: "delete", - frozenResultText: null, + resultText: null, }), ]); vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined); @@ -219,7 +260,7 @@ describe("readDescendantSubagentFallbackReply", () => { cleanup: "keep", createdAt: 500, endedAt: 900, - frozenResultText: "stale output from previous run", + completion: { required: true, resultText: "stale output from previous run" }, }, ]); vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined); diff --git a/src/cron/isolated-agent/subagent-followup.ts b/src/cron/isolated-agent/subagent-followup.ts index 7e7c0320959..67e7aa5c8e0 100644 --- a/src/cron/isolated-agent/subagent-followup.ts +++ b/src/cron/isolated-agent/subagent-followup.ts @@ -45,11 +45,21 @@ export async function readDescendantSubagentFallbackReply(params: { .toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0)) .slice(-4); for (const entry of latestRuns) { - let reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim(); + const frozenResultText = entry.completion?.resultText; + const frozenReply = + typeof frozenResultText === "string" && frozenResultText.trim() + ? frozenResultText.trim() + : undefined; + const usesInternalTranscript = typeof entry.execution?.transcriptFile === "string"; + let reply = usesInternalTranscript ? frozenReply : undefined; + if (!reply && !usesInternalTranscript) { + reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim(); + } // Fall back to the registry's frozen result text when the session transcript - // is unavailable (e.g. child session already deleted by announce cleanup). - if (!reply && typeof entry.frozenResultText === "string" && entry.frozenResultText.trim()) { - reply = entry.frozenResultText.trim(); + // is unavailable (e.g. child session already deleted by announce cleanup) or + // intentionally bypassed by an internal interrupted-resume run. + if (!reply && frozenReply) { + reply = frozenReply; } if (!reply || reply.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { continue; diff --git a/src/gateway/call.test.ts b/src/gateway/call.test.ts index 95d239c14b3..8d5c2217718 100644 --- a/src/gateway/call.test.ts +++ b/src/gateway/call.test.ts @@ -666,6 +666,28 @@ describe("callGateway url resolution", () => { expect(lastClientOptions?.clientDisplayName).toBe("gateway:sessions.delete"); }); + it("sends internal agent handoffs as backend gateway calls", async () => { + setLocalLoopbackGatewayConfig(); + helloMethods = ["agent"]; + + await callGateway({ + method: "agent", + params: { + message: "resume", + sessionEffects: "internal", + suppressPromptPersistence: true, + }, + }); + + expect(lastClientOptions?.clientName).toBe(GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT); + expect(lastClientOptions?.mode).toBe(GATEWAY_CLIENT_MODES.BACKEND); + expect(lastRequestOptions?.method).toBe("agent"); + expect(lastRequestOptions?.params).toMatchObject({ + sessionEffects: "internal", + suppressPromptPersistence: true, + }); + }); + it("passes approval runtime tokens to backend gateway clients", async () => { setLocalLoopbackGatewayConfig(); diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index ab8e24f2ea4..67e54cab700 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -194,6 +194,8 @@ export const AgentParamsSchema = Type.Object( internalRuntimeHandoffId: Type.Optional(NonEmptyString), internalEvents: Type.Optional(Type.Array(AgentInternalEventSchema)), inputProvenance: Type.Optional(InputProvenanceSchema), + suppressPromptPersistence: Type.Optional(Type.Boolean()), + sessionEffects: Type.Optional(Type.Union([Type.Literal("visible"), Type.Literal("internal")])), sourceReplyDeliveryMode: Type.Optional( Type.Union([Type.Literal("automatic"), Type.Literal("message_tool_only")]), ), diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index a71357ce24f..95ee44de7b2 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -1639,6 +1639,66 @@ describe("gateway agent handler", () => { expect(callArgs.message).toContain("sourceTool=subagent_announce"); }); + it("rejects public internal session-effect controls", async () => { + primeMainAgentRun({ cfg: mocks.loadConfigReturn }); + mocks.agentCommand.mockClear(); + + for (const params of [ + { sessionEffects: "internal" as const, idempotencyKey: "test-public-internal-effects" }, + { suppressPromptPersistence: true, idempotencyKey: "test-public-prompt-suppress" }, + ]) { + const respond = await invokeAgent( + { + message: "forged internal control", + agentId: "main", + sessionKey: "agent:main:main", + ...params, + }, + { reqId: params.idempotencyKey, flushDispatch: false }, + ); + + expectRespondError(respond, { + message: "internal session-effect controls are reserved for backend callers.", + }); + } + expect(mocks.agentCommand).not.toHaveBeenCalled(); + }); + + it("keeps backend internal session-effect runs out of visible gateway state", async () => { + primeMainAgentRun({ cfg: mocks.loadConfigReturn }); + mocks.agentCommand.mockClear(); + mocks.registerAgentRunContext.mockClear(); + const context = makeContext(); + + await invokeAgent( + { + message: "internal resume", + agentId: "main", + sessionKey: "agent:main:main", + sessionEffects: "internal", + suppressPromptPersistence: true, + idempotencyKey: "test-backend-internal-effects", + }, + { + reqId: "backend-internal-effects", + client: backendGatewayClient(), + context, + }, + ); + + const callArgs = await waitForAgentCommandCall<{ + sessionEffects?: string; + suppressPromptPersistence?: boolean; + }>(); + expect(callArgs.sessionEffects).toBe("internal"); + expect(callArgs.suppressPromptPersistence).toBe(true); + expect(mocks.updateSessionStore).not.toHaveBeenCalled(); + expect(context.addChatRun).not.toHaveBeenCalled(); + expect(mocks.registerAgentRunContext).toHaveBeenCalledWith("test-backend-internal-effects", { + isControlUiVisible: false, + }); + }); + it("rejects public transcriptMessage overrides", async () => { primeMainAgentRun({ cfg: mocks.loadConfigReturn }); mocks.agentCommand.mockClear(); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 2fd1dafd57e..3023ca58b01 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -767,6 +767,8 @@ export const agentHandlers: GatewayRequestHandlers = { acpTurnSource?: "manual_spawn"; internalRuntimeHandoffId?: string; internalEvents?: AgentInternalEvent[]; + suppressPromptPersistence?: boolean; + sessionEffects?: "visible" | "internal"; idempotencyKey: string; sourceReplyDeliveryMode?: "automatic" | "message_tool_only"; disableMessageTool?: boolean; @@ -782,6 +784,8 @@ export const agentHandlers: GatewayRequestHandlers = { const canResetSession = resolveCanResetSessionFromClient(client); const canUseInternalRuntimeHandoff = resolveCanUseInternalRuntimeHandoff(client); const requestedModelOverride = Boolean(request.provider || request.model); + const requestedInternalSessionEffects = request.sessionEffects === "internal"; + const requestedPromptPersistenceSuppression = request.suppressPromptPersistence === true; const isRawModelRun = request.modelRun === true || request.promptMode === "none"; if (requestedModelOverride && !allowModelOverride) { respond( @@ -794,6 +798,20 @@ export const agentHandlers: GatewayRequestHandlers = { ); return; } + if ( + (requestedInternalSessionEffects || requestedPromptPersistenceSuppression) && + !canUseInternalRuntimeHandoff + ) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "internal session-effect controls are reserved for backend callers.", + ), + ); + return; + } const providerOverride = allowModelOverride ? request.provider : undefined; const modelOverride = allowModelOverride ? request.model : undefined; const cfg = context.getRuntimeConfig(); @@ -821,6 +839,8 @@ export const agentHandlers: GatewayRequestHandlers = { let resolvedGroupSpace: string | undefined = normalizedSpawned.groupSpace; let spawnedByValue: string | undefined; const inputProvenance = normalizeInputProvenance(request.inputProvenance); + const sessionEffects = requestedInternalSessionEffects ? "internal" : request.sessionEffects; + const suppressVisibleSessionEffects = sessionEffects === "internal"; const agentDedupeKeys = resolveAgentDedupeKeys({ idempotencyKey: idem, execApprovalFollowupApprovalId, @@ -1445,7 +1465,7 @@ export const agentHandlers: GatewayRequestHandlers = { agentId, }).sessionStartedAt : undefined; - if (storePath) { + if (storePath && !suppressVisibleSessionEffects) { const requestedStoreKey = requestedSessionKey; let deniedBySendPolicy = false; const persisted = await updateSessionStore(storePath, (store) => { @@ -1514,7 +1534,10 @@ export const agentHandlers: GatewayRequestHandlers = { return; } } - if (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") { + if ( + !suppressVisibleSessionEffects && + (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") + ) { context.addChatRun(idem, { sessionKey: canonicalSessionKey, clientRunId: idem, @@ -1523,7 +1546,12 @@ export const agentHandlers: GatewayRequestHandlers = { bestEffortDeliver = true; } } - registerAgentRunContext(idem, { sessionKey: canonicalSessionKey }); + registerAgentRunContext( + idem, + suppressVisibleSessionEffects + ? { isControlUiVisible: false } + : { sessionKey: canonicalSessionKey }, + ); } const connId = typeof client?.connId === "string" ? client.connId : undefined; @@ -1921,12 +1949,15 @@ export const agentHandlers: GatewayRequestHandlers = { acpTurnSource: request.acpTurnSource, internalEvents: request.internalEvents, inputProvenance, + sessionEffects, sourceReplyDeliveryMode: request.sourceReplyDeliveryMode, disableMessageTool: request.disableMessageTool, - suppressPromptPersistence: shouldSuppressAgentPromptPersistence({ - inputProvenance, - internalEvents: request.internalEvents, - }), + suppressPromptPersistence: + requestedPromptPersistenceSuppression || + shouldSuppressAgentPromptPersistence({ + inputProvenance, + internalEvents: request.internalEvents, + }), cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd, abortSignal: activeRunAbort.controller.signal, onActiveModelSelected: ({ provider }) => {