refactor: split subagent delivery state

This commit is contained in:
Peter Steinberger
2026-05-23 23:43:03 +01:00
parent 3cf806d172
commit d73f3ac85d
35 changed files with 1588 additions and 613 deletions

View File

@@ -45,6 +45,8 @@ const state = vi.hoisted(() => ({
resolvedSkills: [],
version: 0,
})),
prepareInternalSessionEffectsTranscriptMock: vi.fn(),
removeInternalSessionEffectsTranscriptMock: vi.fn(),
authProfileStoreMock: { profiles: {} } as { profiles: Record<string, unknown> },
sessionEntryMock: undefined as unknown,
sessionStoreMock: undefined as unknown,
@@ -211,6 +213,13 @@ vi.mock("../config/sessions/transcript-resolve.runtime.js", () => ({
}),
}));
vi.mock("./internal-session-effects.js", () => ({
prepareInternalSessionEffectsTranscript: (...args: unknown[]) =>
state.prepareInternalSessionEffectsTranscriptMock(...args),
removeInternalSessionEffectsTranscript: (...args: unknown[]) =>
state.removeInternalSessionEffectsTranscriptMock(...args),
}));
vi.mock("../infra/agent-events.js", () => ({
clearAgentRunContext: (...args: unknown[]) => state.clearAgentRunContextMock(...args),
emitAgentEvent: (...args: unknown[]) => state.emitAgentEventMock(...args),
@@ -808,6 +817,10 @@ describe("agentCommand LiveSessionModelSwitchError retry", () => {
state.deliverAgentCommandResultMock.mockResolvedValue(undefined);
state.updateSessionStoreAfterAgentRunMock.mockResolvedValue(undefined);
state.trajectoryFlushMock.mockResolvedValue(undefined);
state.prepareInternalSessionEffectsTranscriptMock.mockResolvedValue(
"/tmp/openclaw-internal-run.jsonl",
);
state.removeInternalSessionEffectsTranscriptMock.mockResolvedValue(undefined);
});
afterEach(() => {
@@ -900,6 +913,46 @@ describe("agentCommand LiveSessionModelSwitchError retry", () => {
expect(stored?.pendingFinalDeliveryIntentId).toBeUndefined();
});
it("keeps internal session-effect CLI runs out of visible session state", async () => {
setupSingleAttemptFallback();
const visibleEntry: SessionEntry = {
sessionId: "session-1",
updatedAt: 1,
sessionFile: "/tmp/session.jsonl",
providerOverride: "anthropic",
modelOverride: "claude",
modelOverrideSource: "user",
skillsSnapshot: { prompt: "visible", skills: [{ name: "existing" }], version: 1 },
};
const sessionStore: Record<string, SessionEntry> = { "agent:main:main": visibleEntry };
state.sessionEntryMock = visibleEntry;
state.sessionStoreMock = sessionStore;
state.storePathMock = "/tmp/openclaw-session-store.json";
const attemptCalls: Array<{ sessionFile?: string; sessionEntry?: SessionEntry }> = [];
state.runAgentAttemptMock.mockImplementation(async (params) => {
attemptCalls.push(params as { sessionFile?: string; sessionEntry?: SessionEntry });
return makeSuccessResult("openai", "gpt-5.4");
});
await agentCommand({
message: "internal resume",
to: "+1234567890",
sessionEffects: "internal",
suppressPromptPersistence: true,
});
expect(state.prepareInternalSessionEffectsTranscriptMock).toHaveBeenCalledWith({
sessionFile: "/tmp/session.jsonl",
runId: expect.any(String),
});
expect(attemptCalls).toHaveLength(1);
expect(attemptCalls[0]?.sessionFile).toBe("/tmp/openclaw-internal-run.jsonl");
expect(attemptCalls[0]?.sessionEntry).toBe(visibleEntry);
expect(state.persistSessionEntryMock).not.toHaveBeenCalled();
expect(state.updateSessionStoreAfterAgentRunMock).not.toHaveBeenCalled();
expect(sessionStore["agent:main:main"]).toBe(visibleEntry);
});
it("does not duplicate finishing lifecycle when an attempt already emitted finishing", async () => {
setupModelSwitchRetry({
provider: "openai",

View File

@@ -74,6 +74,7 @@ import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js";
import { resolveFastModeState } from "./fast-mode.js";
import { ensureSelectedAgentHarnessPlugin } from "./harness/runtime-plugin.js";
import { resolveAvailableAgentHarnessPolicy } from "./harness/selection.js";
import { prepareInternalSessionEffectsTranscript } from "./internal-session-effects.js";
import { AGENT_LANE_SUBAGENT } from "./lanes.js";
import { LiveSessionModelSwitchError } from "./live-model-switch.js";
import { loadManifestModelCatalog } from "./model-catalog.js";
@@ -529,6 +530,7 @@ async function agentCommandInternal(
) {
const resolvedDeps = await resolveAgentCommandDeps(deps);
const isRawModelRun = opts.modelRun === true || opts.promptMode === "none";
const suppressVisibleSessionEffects = opts.sessionEffects === "internal";
const prepared = await prepareAgentCommandExecution(opts, runtime);
const {
body,
@@ -580,9 +582,14 @@ async function agentCommandInternal(
if (!isRawModelRun && acpResolution?.kind === "ready" && sessionKey) {
const attemptExecutionRuntime = await loadAttemptExecutionRuntime();
const startedAt = Date.now();
registerAgentRunContext(runId, {
sessionKey,
});
registerAgentRunContext(
runId,
suppressVisibleSessionEffects
? { isControlUiVisible: false }
: {
sessionKey,
},
);
attemptExecutionRuntime.emitAcpLifecycleStart({ runId, startedAt });
const visibleTextAccumulator = attemptExecutionRuntime.createAcpVisibleTextAccumulator();
@@ -676,21 +683,53 @@ async function agentCommandInternal(
const finalTextRaw = visibleTextAccumulator.finalizeRaw();
const finalText = visibleTextAccumulator.finalize();
try {
const { resolveAcpSessionCwd } = await loadAcpSessionIdentifiersRuntime();
const [{ resolveAcpSessionCwd }, { resolveSessionTranscriptFile }] = await Promise.all([
loadAcpSessionIdentifiersRuntime(),
loadTranscriptResolveRuntime(),
]);
const internalSource = suppressVisibleSessionEffects
? await resolveSessionTranscriptFile({
sessionId,
sessionKey,
sessionEntry,
agentId: sessionAgentId,
threadId: opts.threadId,
})
: undefined;
const internalSessionFile = suppressVisibleSessionEffects
? await prepareInternalSessionEffectsTranscript({
sessionFile: internalSource?.sessionFile,
runId,
})
: undefined;
const transcriptSessionEntry: SessionEntry | undefined = internalSessionFile
? {
...(sessionEntry ?? {
sessionId,
updatedAt: Date.now(),
sessionStartedAt: Date.now(),
}),
sessionId,
sessionFile: internalSessionFile,
}
: sessionEntry;
sessionEntry = await attemptExecutionRuntime.persistAcpTurnTranscript({
body,
transcriptBody,
finalText: finalTextRaw,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionEntry: transcriptSessionEntry,
sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore,
storePath: suppressVisibleSessionEffects ? undefined : storePath,
sessionAgentId,
threadId: opts.threadId,
sessionCwd: resolveAcpSessionCwd(acpResolution.meta) ?? workspaceDir,
config: cfg,
});
if (internalSessionFile) {
sessionEntry = prepared.sessionEntry;
}
} catch (error) {
log.warn(
`ACP transcript persistence failed for ${sessionKey}: ${formatErrorMessage(error)}`,
@@ -722,10 +761,11 @@ async function agentCommandInternal(
const resolvedVerboseLevel =
verboseOverride ?? persistedVerbose ?? (agentCfg?.verboseDefault as VerboseLevel | undefined);
if (sessionKey) {
if (sessionKey || suppressVisibleSessionEffects) {
registerAgentRunContext(runId, {
sessionKey,
...(sessionKey && !suppressVisibleSessionEffects ? { sessionKey } : {}),
verboseLevel: resolvedVerboseLevel,
isControlUiVisible: !suppressVisibleSessionEffects,
});
}
@@ -772,7 +812,13 @@ async function agentCommandInternal(
? undefined
: await hydrateResolvedSkillsAsync(currentSkillsSnapshot, buildSkillsSnapshot);
if (skillsSnapshot && sessionStore && sessionKey && needsSkillsSnapshot) {
if (
skillsSnapshot &&
sessionStore &&
sessionKey &&
needsSkillsSnapshot &&
!suppressVisibleSessionEffects
) {
const now = Date.now();
const current = sessionEntry ?? {
sessionId,
@@ -796,7 +842,7 @@ async function agentCommandInternal(
}
// Persist explicit /command overrides to the session store when we have a key.
if (sessionStore && sessionKey) {
if (sessionStore && sessionKey && !suppressVisibleSessionEffects) {
const now = Date.now();
const entry = sessionStore[sessionKey] ??
sessionEntry ?? { sessionId, updatedAt: now, sessionStartedAt: now };
@@ -877,7 +923,13 @@ async function agentCommandInternal(
allowedModelCatalog = visibilityPolicy.allowedCatalog;
}
if (sessionEntry && sessionStore && sessionKey && hasStoredOverride) {
if (
sessionEntry &&
sessionStore &&
sessionKey &&
hasStoredOverride &&
!suppressVisibleSessionEffects
) {
const entry = sessionEntry;
const repaired = repairProviderWrappedModelOverride({
entry,
@@ -1029,7 +1081,7 @@ async function agentCommandInternal(
authProfileOverrideSource: undefined,
authProfileOverrideCompactionCount: undefined,
};
} else if (sessionStore && sessionKey) {
} else if (sessionStore && sessionKey && !suppressVisibleSessionEffects) {
await clearSessionAuthProfileOverride({
sessionEntry: entry,
sessionStore,
@@ -1083,7 +1135,8 @@ async function agentCommandInternal(
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry.thinkingLevel === previousThinkLevel
sessionEntry.thinkingLevel === previousThinkLevel &&
!suppressVisibleSessionEffects
) {
const entry = sessionEntry;
entry.thinkingLevel = fallbackThinkLevel;
@@ -1103,8 +1156,8 @@ async function agentCommandInternal(
const resolvedSessionFile = await resolveSessionTranscriptFile({
sessionId,
sessionKey,
sessionStore,
storePath,
sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore,
storePath: suppressVisibleSessionEffects ? undefined : storePath,
sessionEntry,
agentId: sessionAgentId,
threadId: opts.threadId,
@@ -1124,6 +1177,9 @@ async function agentCommandInternal(
sessionFile = resolvedSessionFile.sessionFile;
sessionEntry = resolvedSessionFile.sessionEntry;
}
const attemptSessionFile = suppressVisibleSessionEffects
? await prepareInternalSessionEffectsTranscript({ sessionFile, runId })
: sessionFile;
const startedAt = Date.now();
const attemptLifecycleState = {
@@ -1295,7 +1351,7 @@ async function agentCommandInternal(
sessionId,
sessionKey,
sessionAgentId,
sessionFile,
sessionFile: attemptSessionFile,
workspaceDir,
body,
isFallbackRetry,
@@ -1317,11 +1373,12 @@ async function agentCommandInternal(
resolvedVerboseLevel,
agentDir,
authProfileProvider: providerForAuthProfileValidation,
sessionStore,
storePath,
sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore,
storePath: suppressVisibleSessionEffects ? undefined : storePath,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
sessionHasHistory:
!isNewSession || (await attemptExecutionRuntime.sessionFileHasContent(sessionFile)),
!isNewSession ||
(await attemptExecutionRuntime.sessionFileHasContent(attemptSessionFile)),
suppressPromptPersistenceOnRetry:
opts.suppressPromptPersistence === true ||
(isFallbackRetry && attemptLifecycleState.currentTurnUserMessagePersisted),
@@ -1340,6 +1397,7 @@ async function agentCommandInternal(
sessionEntry &&
sessionStore &&
sessionKey &&
!suppressVisibleSessionEffects &&
entryMatchesAutoFallbackPrimaryProbe(sessionEntry, autoFallbackPrimaryProbe)
) {
const nextSessionEntry = { ...sessionEntry };
@@ -1493,7 +1551,7 @@ async function agentCommandInternal(
await fallbackTrajectoryRecorder?.flush();
// Update token+model fields in the session store.
if (sessionStore && sessionKey) {
if (sessionStore && sessionKey && !suppressVisibleSessionEffects) {
const { updateSessionStoreAfterAgentRun } = await loadSessionStoreRuntime();
await updateSessionStoreAfterAgentRun({
cfg,
@@ -1524,28 +1582,42 @@ async function agentCommandInternal(
if (transcriptPersistenceRunner === "cli" || embeddedAssistantGapFill) {
let persistedCliTurnTranscript = false;
try {
const transcriptSessionEntry: SessionEntry | undefined = suppressVisibleSessionEffects
? {
...(sessionEntry ?? {
sessionId,
updatedAt: Date.now(),
sessionStartedAt: Date.now(),
}),
sessionId,
sessionFile: attemptSessionFile,
}
: sessionEntry;
sessionEntry = await attemptExecutionRuntime.persistCliTurnTranscript({
body,
transcriptBody,
result,
sessionId,
sessionKey: sessionKey ?? sessionId,
sessionEntry,
sessionStore,
storePath,
sessionEntry: transcriptSessionEntry,
sessionStore: suppressVisibleSessionEffects ? undefined : sessionStore,
storePath: suppressVisibleSessionEffects ? undefined : storePath,
sessionAgentId,
threadId: opts.threadId,
sessionCwd: workspaceDir,
config: cfg,
embeddedAssistantGapFill,
});
if (suppressVisibleSessionEffects) {
sessionEntry = prepared.sessionEntry;
}
persistedCliTurnTranscript = true;
} catch (error) {
log.warn(
`Turn transcript persistence failed for ${sessionKey ?? sessionId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
if (persistedCliTurnTranscript) {
if (persistedCliTurnTranscript && !suppressVisibleSessionEffects) {
sessionEntry = await (
await loadCliCompactionRuntime()
).runCliTurnCompactionLifecycle({
@@ -1579,6 +1651,7 @@ async function agentCommandInternal(
opts.deliver === true &&
sessionStore &&
sessionKey &&
!suppressVisibleSessionEffects &&
payloads.length > 0 &&
!isSubagentSessionKey(sessionKey)
) {
@@ -1612,7 +1685,7 @@ async function agentCommandInternal(
const { deliverAgentCommandResult } = await loadDeliveryRuntime();
const resolveFreshSessionEntryForDelivery =
sessionStore && sessionKey
sessionStore && sessionKey && !suppressVisibleSessionEffects
? async (): Promise<SessionEntry | undefined> => {
const { loadSessionStore } = await loadSessionStoreRuntime();
const freshStore = loadSessionStore(storePath, {
@@ -1648,7 +1721,12 @@ async function agentCommandInternal(
);
// Phase 2: Clear pending delivery payload after successful delivery.
if (sessionStore && sessionKey && !isSubagentSessionKey(sessionKey)) {
if (
sessionStore &&
sessionKey &&
!isSubagentSessionKey(sessionKey) &&
!suppressVisibleSessionEffects
) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const noPendingTextForThisRun =
opts.deliver === true &&

View File

@@ -106,6 +106,8 @@ export type AgentCommandOpts = {
bootstrapContextRunKind?: "default" | "heartbeat" | "cron";
internalEvents?: AgentInternalEvent[];
inputProvenance?: InputProvenance;
/** Internal runs can execute against a session without updating visible status/model/usage. */
sessionEffects?: "visible" | "internal";
/** Visible source replies must be sent through the message tool when set. */
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
/** Internal runs can omit the channel message tool entirely. */

View File

@@ -0,0 +1,72 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
prepareInternalSessionEffectsTranscript,
removeInternalSessionEffectsTranscript,
} from "./internal-session-effects.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
});
describe("prepareInternalSessionEffectsTranscript", () => {
it("creates a private transcript even without a visible source file", async () => {
await withTempDir({ prefix: "openclaw-internal-session-effects-" }, async (dir) => {
process.env.OPENCLAW_STATE_DIR = dir;
const sessionFile = await prepareInternalSessionEffectsTranscript({
runId: "run/with space",
});
expect(sessionFile).toBe(path.join(dir, "internal-agent-runs", "run_with_space.jsonl"));
expect(await fs.readFile(sessionFile, "utf8")).toBe("");
expect((await fs.stat(sessionFile)).mode & 0o777).toBe(0o600);
await removeInternalSessionEffectsTranscript(sessionFile);
await expect(fs.stat(sessionFile)).rejects.toMatchObject({ code: "ENOENT" });
});
});
it("copies a visible source transcript into a private transcript", async () => {
await withTempDir({ prefix: "openclaw-internal-session-effects-" }, async (dir) => {
process.env.OPENCLAW_STATE_DIR = dir;
const sourceFile = path.join(dir, "visible-session.jsonl");
await fs.writeFile(sourceFile, '{"role":"assistant","content":"done"}\n', {
mode: 0o644,
});
const sessionFile = await prepareInternalSessionEffectsTranscript({
sessionFile: sourceFile,
runId: "run-copy",
});
expect(await fs.readFile(sessionFile, "utf8")).toBe(
'{"role":"assistant","content":"done"}\n',
);
expect((await fs.stat(sessionFile)).mode & 0o777).toBe(0o600);
});
});
it("creates an empty private transcript when the visible source is missing", async () => {
await withTempDir({ prefix: "openclaw-internal-session-effects-" }, async (dir) => {
process.env.OPENCLAW_STATE_DIR = dir;
const sessionFile = await prepareInternalSessionEffectsTranscript({
sessionFile: path.join(dir, "missing-session.jsonl"),
runId: "run-missing-source",
});
expect(await fs.readFile(sessionFile, "utf8")).toBe("");
expect((await fs.stat(sessionFile)).mode & 0o777).toBe(0o600);
});
});
});

View File

@@ -0,0 +1,50 @@
import { promises as fs } from "node:fs";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
export function resolveInternalSessionEffectsTranscriptPath(runId: string): string {
const safeRunId = runId.replace(/[^a-zA-Z0-9._-]/g, "_").slice(0, 120) || "run";
return path.join(resolveStateDir(), "internal-agent-runs", `${safeRunId}.jsonl`);
}
export async function prepareInternalSessionEffectsTranscript(params: {
sessionFile?: string;
runId: string;
}): Promise<string> {
// Callers must persist this path in an owning lifecycle record and invoke
// removeInternalSessionEffectsTranscript once the recovered output is no longer needed.
const sessionFile = resolveInternalSessionEffectsTranscriptPath(params.runId);
await fs.mkdir(path.dirname(sessionFile), { recursive: true, mode: 0o700 });
if (!params.sessionFile) {
await fs.writeFile(sessionFile, "", { mode: 0o600 });
await fs.chmod(sessionFile, 0o600);
return sessionFile;
}
try {
const contents = await fs.readFile(params.sessionFile);
await fs.writeFile(sessionFile, contents, { mode: 0o600 });
await fs.chmod(sessionFile, 0o600);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
await fs.writeFile(sessionFile, "", { mode: 0o600 });
await fs.chmod(sessionFile, 0o600);
}
return sessionFile;
}
export async function removeInternalSessionEffectsTranscript(
sessionFile: string | undefined,
): Promise<void> {
const dir = path.join(resolveStateDir(), "internal-agent-runs");
const resolved = sessionFile ? path.resolve(sessionFile) : "";
if (!resolved || path.dirname(resolved) !== path.resolve(dir)) {
return;
}
try {
await fs.rm(resolved, { force: true });
} catch {
// Best-effort privacy/disk cleanup; run cleanup must not fail on temp-file races.
}
}

View File

@@ -7,13 +7,20 @@ import {
} from "./subagent-announce-output.js";
type CallGateway = typeof import("../gateway/call.js").callGateway;
type ReadSessionMessagesAsync =
typeof import("./subagent-announce.runtime.js").readSessionMessagesAsync;
function installOutputDeps(params: { messages: Array<unknown> }) {
function installOutputDeps(params: {
messages: Array<unknown>;
transcriptMessages?: Array<unknown>;
}) {
const callGateway = vi.fn(async () => ({ messages: params.messages }));
const readSessionMessagesAsync = vi.fn(async () => params.transcriptMessages ?? []);
testing.setDepsForTest({
callGateway: callGateway as unknown as CallGateway,
readSessionMessagesAsync: readSessionMessagesAsync as unknown as ReadSessionMessagesAsync,
});
return { callGateway };
return { callGateway, readSessionMessagesAsync };
}
function sessionsYieldTurn(message = "Waiting for subagent completion.") {
@@ -146,6 +153,56 @@ describe("readSubagentOutput", () => {
await expect(readSubagentOutput("agent:main:subagent:child")).resolves.toBeUndefined();
});
it("reads recovered output from the private transcript before gateway history", async () => {
const deps = installOutputDeps({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "stale visible output" }],
},
],
transcriptMessages: [
{
role: "assistant",
stopReason: "stop",
content: [{ type: "text", text: "fresh recovered output" }],
},
],
});
await expect(
readSubagentOutput("agent:main:subagent:child", undefined, {
sessionFile: "/tmp/openclaw-internal-run.jsonl",
}),
).resolves.toBe("fresh recovered output");
expect(deps.readSessionMessagesAsync).toHaveBeenCalledWith(
"agent:main:subagent:child",
undefined,
"/tmp/openclaw-internal-run.jsonl",
{ mode: "recent", maxMessages: 100, maxBytes: 1024 * 1024 },
);
expect(deps.callGateway).not.toHaveBeenCalled();
});
it("does not read visible gateway history when a private transcript is empty", async () => {
const deps = installOutputDeps({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "stale visible output" }],
},
],
transcriptMessages: [],
});
await expect(
readSubagentOutput("agent:main:subagent:child", undefined, {
sessionFile: "/tmp/openclaw-empty-internal-run.jsonl",
}),
).resolves.toBeUndefined();
expect(deps.callGateway).not.toHaveBeenCalled();
});
});
describe("buildChildCompletionFindings", () => {
@@ -155,7 +212,7 @@ describe("buildChildCompletionFindings", () => {
childSessionKey: "agent:main:subagent:silent",
task: "silent task",
createdAt: 1,
frozenResultText: "ANNOUNCE_SKIP",
completion: { resultText: "ANNOUNCE_SKIP" },
outcome: { status: "ok" },
},
]);
@@ -169,7 +226,7 @@ describe("buildChildCompletionFindings", () => {
childSessionKey: "agent:main:subagent:silent",
task: "silent task",
createdAt: 1,
frozenResultText: "ANNOUNCE_SKIP",
completion: { resultText: "ANNOUNCE_SKIP" },
outcome: { status: "error", error: "boom" },
},
]);
@@ -184,14 +241,14 @@ describe("buildChildCompletionFindings", () => {
childSessionKey: "agent:main:subagent:silent",
task: "silent task",
createdAt: 1,
frozenResultText: "ANNOUNCE_SKIP",
completion: { resultText: "ANNOUNCE_SKIP" },
outcome: { status: "ok" },
},
{
childSessionKey: "agent:main:subagent:visible",
task: "visible task",
createdAt: 2,
frozenResultText: "actual output",
completion: { resultText: "actual output" },
outcome: { status: "ok" },
},
]);

View File

@@ -9,6 +9,7 @@ import {
callGateway,
getRuntimeConfig,
readSessionEntry,
readSessionMessagesAsync,
resolveAgentIdFromSessionKey,
resolveStorePath,
} from "./subagent-announce.runtime.js";
@@ -22,6 +23,7 @@ type SubagentAnnounceOutputDeps = {
callGateway: typeof callGateway;
getRuntimeConfig: typeof getRuntimeConfig;
readSessionEntry: typeof readSessionEntry;
readSessionMessagesAsync: typeof readSessionMessagesAsync;
resolveAgentIdFromSessionKey: typeof resolveAgentIdFromSessionKey;
resolveStorePath: typeof resolveStorePath;
};
@@ -30,6 +32,7 @@ const defaultSubagentAnnounceOutputDeps: SubagentAnnounceOutputDeps = {
callGateway,
getRuntimeConfig,
readSessionEntry,
readSessionMessagesAsync,
resolveAgentIdFromSessionKey,
resolveStorePath,
};
@@ -194,13 +197,31 @@ function selectSubagentOutputText(snapshot: SubagentOutputSnapshot): string | un
export async function readSubagentOutput(
sessionKey: string,
_outcome?: SubagentRunOutcome,
options?: { sessionFile?: string },
): Promise<string | undefined> {
const history = await subagentAnnounceOutputDeps.callGateway({
method: "chat.history",
params: { sessionKey, limit: 100 },
});
const messages = Array.isArray(history?.messages) ? history.messages : [];
const snapshot = summarizeSubagentOutputHistory(messages);
let messages: unknown[] | undefined;
if (options?.sessionFile) {
const transcriptMessages = await subagentAnnounceOutputDeps.readSessionMessagesAsync(
sessionKey,
undefined,
options.sessionFile,
{
mode: "recent",
maxMessages: 100,
maxBytes: 1024 * 1024,
},
);
messages = transcriptMessages;
}
const history =
messages === undefined
? await subagentAnnounceOutputDeps.callGateway({
method: "chat.history",
params: { sessionKey, limit: 100 },
})
: undefined;
const sourceMessages = messages ?? (Array.isArray(history?.messages) ? history.messages : []);
const snapshot = summarizeSubagentOutputHistory(sourceMessages);
const selected = selectSubagentOutputText(snapshot);
if (selected?.trim()) {
return selected;
@@ -272,7 +293,7 @@ export function applySubagentWaitOutcome(params: {
export async function captureSubagentCompletionReply(
sessionKey: string,
options?: { waitForReply?: boolean; outcome?: SubagentRunOutcome },
options?: { waitForReply?: boolean; outcome?: SubagentRunOutcome; sessionFile?: string },
): Promise<string | undefined> {
return await captureSubagentCompletionReplyUsing({
sessionKey,
@@ -280,7 +301,9 @@ export async function captureSubagentCompletionReply(
maxWaitMs: isFastTestMode() ? 50 : 1_500,
retryIntervalMs: isFastTestMode() ? FAST_TEST_RETRY_INTERVAL_MS : 100,
readSubagentOutput: async (nextSessionKey) =>
await readSubagentOutput(nextSessionKey, options?.outcome),
await readSubagentOutput(nextSessionKey, options?.outcome, {
sessionFile: options?.sessionFile,
}),
});
}
@@ -316,7 +339,9 @@ export function buildChildCompletionFindings(
label?: string;
createdAt: number;
endedAt?: number;
frozenResultText?: string | null;
completion?: {
resultText?: string | null;
};
outcome?: SubagentRunOutcome;
}>,
): string | undefined {
@@ -331,7 +356,7 @@ export function buildChildCompletionFindings(
const sections: string[] = [];
for (const [index, child] of sorted.entries()) {
const resultText = child.frozenResultText?.trim();
const resultText = child.completion?.resultText?.trim();
const outcome = describeSubagentOutcome(child.outcome);
if (
child.outcome?.status === "ok" &&
@@ -367,7 +392,9 @@ export function dedupeLatestChildCompletionRows(
label?: string;
createdAt: number;
endedAt?: number;
frozenResultText?: string | null;
completion?: {
resultText?: string | null;
};
outcome?: SubagentRunOutcome;
}>,
) {
@@ -390,7 +417,9 @@ export function filterCurrentDirectChildCompletionRows(
label?: string;
createdAt: number;
endedAt?: number;
frozenResultText?: string | null;
completion?: {
resultText?: string | null;
};
outcome?: SubagentRunOutcome;
}>,
params: {

View File

@@ -314,11 +314,11 @@ describeLive("subagent announce live", () => {
return listSubagentRunsForRequester(sessionKey).find(
(run) =>
run.taskName === "issue_82913_child" &&
run.frozenResultText?.includes(childToken) === true &&
run.completion?.resultText?.includes(childToken) === true &&
run.outcome?.status === "ok",
);
});
expect(completedRunBeforeDelivery.completionAnnouncedAt).toBeUndefined();
expect(completedRunBeforeDelivery.delivery?.announcedAt).toBeUndefined();
expect(parentObservedAt).toBeUndefined();
const parent = await initialRequest;
@@ -330,14 +330,14 @@ describeLive("subagent announce live", () => {
listSubagentRunsForRequester(sessionKey).find(
(run) =>
run.runId === completedRunBeforeDelivery.runId &&
typeof run.completionEnqueuedAt === "number" &&
typeof run.completionDeliveredAt === "number" &&
typeof run.completionAnnouncedAt === "number",
typeof run.delivery?.enqueuedAt === "number" &&
typeof run.delivery?.deliveredAt === "number" &&
typeof run.delivery?.announcedAt === "number",
),
);
const enqueuedAt = completedRun.completionEnqueuedAt!;
const deliveredAt = completedRun.completionDeliveredAt!;
const announcedAt = completedRun.completionAnnouncedAt!;
const enqueuedAt = completedRun.delivery?.enqueuedAt ?? 0;
const deliveredAt = completedRun.delivery?.deliveredAt ?? 0;
const announcedAt = completedRun.delivery?.announcedAt ?? 0;
const enqueuedToDeliveredMs = deliveredAt - enqueuedAt;
const announcedToParentObservedMs = Math.abs(parentObservedAt - announcedAt);
console.log(
@@ -352,7 +352,7 @@ describeLive("subagent announce live", () => {
announcedToParentObservedMs,
})}`,
);
expect(completedRun.completionAnnouncedAt).toBe(deliveredAt);
expect(completedRun.delivery?.announcedAt).toBe(deliveredAt);
expect(enqueuedToDeliveredMs).toBeGreaterThan(10_000);
expect(announcedToParentObservedMs).toBeLessThan(20_000);
},
@@ -499,12 +499,12 @@ describeLive("subagent announce live", () => {
return listSubagentRunsForRequester(sessionKey).find(
(run) =>
run.taskName === "steered_child" &&
run.frozenResultText?.includes(childToken) === true &&
run.completion?.resultText?.includes(childToken) === true &&
run.outcome?.status === "ok",
);
});
expect(steeredRun.endedReason).toBe("subagent-complete");
expect(steeredRun.lastAnnounceDeliveryError).toBeUndefined();
expect(steeredRun.delivery?.lastError).toBeUndefined();
await waitFor("in-process subagent completion agent dispatch start", () => {
if (initialError) {
@@ -659,7 +659,8 @@ describeLive("subagent announce live", () => {
const completed = childTokens.every((childToken) =>
runs.some(
(run) =>
run.frozenResultText?.includes(childToken) === true && run.outcome?.status === "ok",
run.completion?.resultText?.includes(childToken) === true &&
run.outcome?.status === "ok",
),
);
return completed ? runs : undefined;
@@ -667,7 +668,9 @@ describeLive("subagent announce live", () => {
expect(completedRuns).toHaveLength(3);
for (const childToken of childTokens) {
expect(completedRuns.some((run) => run.frozenResultText?.includes(childToken))).toBe(true);
expect(completedRuns.some((run) => run.completion?.resultText?.includes(childToken))).toBe(
true,
);
}
const parent = await initialRequest;

View File

@@ -6,5 +6,6 @@ export {
resolveStorePath,
} from "../config/sessions.js";
export { callGateway } from "../gateway/call.js";
export { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
export { dispatchGatewayMethodInProcess } from "../gateway/server-plugins.js";
export { isEmbeddedPiRunActive, waitForEmbeddedPiRunEnd } from "./pi-embedded-runner/runs.js";

View File

@@ -0,0 +1,120 @@
import { describe, expect, it } from "vitest";
import { normalizeSubagentRunState } from "./subagent-delivery-state.js";
import type { LegacySubagentRunRecord } from "./subagent-delivery-state.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
function baseRun(overrides: Partial<LegacySubagentRunRecord> = {}): LegacySubagentRunRecord {
return {
runId: "run-1",
childSessionKey: "agent:main:subagent:child",
requesterSessionKey: "agent:main:parent",
requesterDisplayKey: "agent:main:parent",
controllerSessionKey: "agent:main:parent",
task: "inspect",
cleanup: "keep",
spawnMode: "run",
createdAt: 100,
startedAt: 100,
expectsCompletionMessage: true,
...overrides,
};
}
describe("normalizeSubagentRunState", () => {
it("migrates legacy pending delivery fields into nested completion and delivery state", () => {
const entry = normalizeSubagentRunState(
baseRun({
frozenResultText: "child output",
frozenResultCapturedAt: 200,
pendingFinalDelivery: true,
pendingFinalDeliveryCreatedAt: 210,
pendingFinalDeliveryLastAttemptAt: 220,
pendingFinalDeliveryAttemptCount: 3,
pendingFinalDeliveryLastError: "sink unavailable",
pendingFinalDeliveryPayload: {
requesterSessionKey: "agent:main:parent",
requesterDisplayKey: "agent:main:parent",
childSessionKey: "agent:main:subagent:child",
childRunId: "run-1",
task: "inspect",
startedAt: 100,
expectsCompletionMessage: true,
frozenResultText: "child output",
},
}),
) as SubagentRunRecord & { pendingFinalDelivery?: boolean; frozenResultText?: string };
expect(entry.completion).toMatchObject({
required: true,
resultText: "child output",
capturedAt: 200,
});
expect(entry.delivery).toMatchObject({
status: "pending",
createdAt: 210,
lastAttemptAt: 220,
attemptCount: 3,
lastError: "sink unavailable",
payload: expect.objectContaining({ childRunId: "run-1" }),
});
expect(entry.pendingFinalDelivery).toBeUndefined();
expect(entry.frozenResultText).toBeUndefined();
});
it("merges partial nested state with legacy fields before stripping legacy fields", () => {
const entry = normalizeSubagentRunState(
baseRun({
completion: { required: true },
delivery: { status: "not_required" },
pendingFinalDelivery: true,
pendingFinalDeliveryAttemptCount: 2,
lastAnnounceRetryAt: 240,
frozenResultText: "legacy result",
}),
) as SubagentRunRecord & { pendingFinalDelivery?: boolean; lastAnnounceRetryAt?: number };
expect(entry.completion?.resultText).toBe("legacy result");
expect(entry.delivery).toMatchObject({
status: "pending",
attemptCount: 2,
lastAttemptAt: 240,
});
expect(entry.pendingFinalDelivery).toBeUndefined();
expect(entry.lastAnnounceRetryAt).toBeUndefined();
});
it("clears stale cleanupHandled locks for unfinished restored cleanup", () => {
const entry = normalizeSubagentRunState(baseRun({ cleanupHandled: true }));
expect(entry.cleanupHandled).toBe(false);
});
it("clears stale cleanupHandled locks after delivered notification if cleanup did not finish", () => {
const entry = normalizeSubagentRunState(
baseRun({
cleanupHandled: true,
delivery: {
status: "delivered",
announcedAt: 400,
},
}),
);
expect(entry.cleanupHandled).toBe(false);
});
it("keeps discarded terminal delivery dormant across restart", () => {
const entry = normalizeSubagentRunState(
baseRun({
cleanupHandled: true,
delivery: {
status: "discarded",
discardedAt: 400,
discardReason: "expired",
},
}),
);
expect(entry.cleanupHandled).toBe(true);
});
});

View File

@@ -0,0 +1,258 @@
import type {
PendingFinalDeliveryPayload,
SubagentCompletionDeliveryState,
SubagentCompletionState,
SubagentExecutionState,
SubagentRunRecord,
} from "./subagent-registry.types.js";
export type LegacySubagentRunRecord = SubagentRunRecord & {
announceRetryCount?: number;
lastAnnounceRetryAt?: number;
lastAnnounceDeliveryError?: string;
frozenResultText?: string | null;
frozenResultCapturedAt?: number;
fallbackFrozenResultText?: string | null;
fallbackFrozenResultCapturedAt?: number;
pendingFinalDelivery?: boolean;
pendingFinalDeliveryCreatedAt?: number;
pendingFinalDeliveryLastAttemptAt?: number;
pendingFinalDeliveryAttemptCount?: number;
pendingFinalDeliveryLastError?: string | null;
pendingFinalDeliveryPayload?: PendingFinalDeliveryPayload;
deliverySuspendedAt?: number;
deliverySuspendedReason?: "retry-limit" | "expiry";
deliveryDiscardedAt?: number;
deliveryDiscardReason?: "expired" | "pressure-pruned";
deliveryDiscardedPayloadSummary?: SubagentCompletionDeliveryState["discardedPayloadSummary"];
completionEnqueuedAt?: number;
completionDeliveredAt?: number;
completionAnnouncedAt?: number;
lastAnnounceDropReason?: SubagentCompletionDeliveryState["lastDropReason"];
};
export function normalizeSubagentRunState(entry: SubagentRunRecord): SubagentRunRecord {
const legacy = entry as LegacySubagentRunRecord;
entry.execution = mergeExecutionState(entry.execution, buildExecutionState(entry));
entry.completion = mergeCompletionState(entry.completion, buildCompletionState(entry, legacy));
entry.delivery = mergeDeliveryState(entry, entry.delivery, buildDeliveryState(entry, legacy));
// cleanupHandled is an in-process lock; after restart, unfinished cleanup must retry.
if (
entry.cleanupHandled === true &&
typeof entry.cleanupCompletedAt !== "number" &&
entry.delivery?.status !== "discarded"
) {
entry.cleanupHandled = false;
}
delete legacy.announceRetryCount;
delete legacy.lastAnnounceRetryAt;
delete legacy.lastAnnounceDeliveryError;
delete legacy.frozenResultText;
delete legacy.frozenResultCapturedAt;
delete legacy.fallbackFrozenResultText;
delete legacy.fallbackFrozenResultCapturedAt;
delete legacy.pendingFinalDelivery;
delete legacy.pendingFinalDeliveryCreatedAt;
delete legacy.pendingFinalDeliveryLastAttemptAt;
delete legacy.pendingFinalDeliveryAttemptCount;
delete legacy.pendingFinalDeliveryLastError;
delete legacy.pendingFinalDeliveryPayload;
delete legacy.deliverySuspendedAt;
delete legacy.deliverySuspendedReason;
delete legacy.deliveryDiscardedAt;
delete legacy.deliveryDiscardReason;
delete legacy.deliveryDiscardedPayloadSummary;
delete legacy.completionEnqueuedAt;
delete legacy.completionDeliveredAt;
delete legacy.completionAnnouncedAt;
delete legacy.lastAnnounceDropReason;
return entry;
}
function mergeExecutionState(
current: SubagentExecutionState | undefined,
restored: SubagentExecutionState,
): SubagentExecutionState {
return current ? { ...restored, ...current } : restored;
}
function mergeCompletionState(
current: SubagentCompletionState | undefined,
restored: SubagentCompletionState,
): SubagentCompletionState {
if (!current) {
return restored;
}
return {
...restored,
...current,
required: current.required ?? restored.required,
};
}
function mergeDeliveryState(
entry: SubagentRunRecord,
current: SubagentCompletionDeliveryState | undefined,
restored: SubagentCompletionDeliveryState,
): SubagentCompletionDeliveryState {
if (!current) {
return restored;
}
const status =
current.status === "not_required" &&
entry.expectsCompletionMessage !== false &&
restored.status !== "not_required"
? restored.status
: current.status;
return {
...restored,
...current,
status,
payload: current.payload ?? restored.payload,
createdAt: current.createdAt ?? restored.createdAt,
enqueuedAt: current.enqueuedAt ?? restored.enqueuedAt,
deliveredAt: current.deliveredAt ?? restored.deliveredAt,
announcedAt: current.announcedAt ?? restored.announcedAt,
lastAttemptAt: current.lastAttemptAt ?? restored.lastAttemptAt,
attemptCount: current.attemptCount ?? restored.attemptCount,
lastError: current.lastError ?? restored.lastError,
suspendedAt: current.suspendedAt ?? restored.suspendedAt,
suspendedReason: current.suspendedReason ?? restored.suspendedReason,
discardedAt: current.discardedAt ?? restored.discardedAt,
discardReason: current.discardReason ?? restored.discardReason,
discardedPayloadSummary: current.discardedPayloadSummary ?? restored.discardedPayloadSummary,
lastDropReason: current.lastDropReason ?? restored.lastDropReason,
};
}
function buildExecutionState(entry: SubagentRunRecord): SubagentExecutionState {
if (typeof entry.endedAt === "number") {
return {
status: "terminal",
startedAt: entry.startedAt,
endedAt: entry.endedAt,
outcome: entry.outcome,
};
}
return {
status: "running",
startedAt: entry.startedAt,
};
}
function buildCompletionState(
entry: SubagentRunRecord,
legacy: LegacySubagentRunRecord,
): SubagentCompletionState {
return {
required: entry.expectsCompletionMessage === true,
...(legacy.frozenResultText !== undefined ? { resultText: legacy.frozenResultText } : {}),
...(typeof legacy.frozenResultCapturedAt === "number"
? { capturedAt: legacy.frozenResultCapturedAt }
: {}),
...(legacy.fallbackFrozenResultText !== undefined
? { fallbackResultText: legacy.fallbackFrozenResultText }
: {}),
...(typeof legacy.fallbackFrozenResultCapturedAt === "number"
? { fallbackCapturedAt: legacy.fallbackFrozenResultCapturedAt }
: {}),
};
}
function buildDeliveryState(
entry: SubagentRunRecord,
legacy: LegacySubagentRunRecord,
): SubagentCompletionDeliveryState {
if (entry.expectsCompletionMessage === false) {
return { status: "not_required" };
}
if (typeof legacy.deliveryDiscardedAt === "number") {
return {
status: "discarded",
discardedAt: legacy.deliveryDiscardedAt,
discardReason: legacy.deliveryDiscardReason,
discardedPayloadSummary: legacy.deliveryDiscardedPayloadSummary,
};
}
if (typeof legacy.deliverySuspendedAt === "number") {
return {
status: "suspended",
payload: legacy.pendingFinalDeliveryPayload,
createdAt: legacy.pendingFinalDeliveryCreatedAt,
lastAttemptAt: legacy.pendingFinalDeliveryLastAttemptAt ?? legacy.lastAnnounceRetryAt,
attemptCount: legacy.pendingFinalDeliveryAttemptCount ?? legacy.announceRetryCount,
lastError: legacy.pendingFinalDeliveryLastError ?? legacy.lastAnnounceDeliveryError ?? null,
suspendedAt: legacy.deliverySuspendedAt,
suspendedReason: legacy.deliverySuspendedReason,
lastDropReason: legacy.lastAnnounceDropReason,
};
}
if (typeof legacy.completionAnnouncedAt === "number") {
return {
status: "delivered",
enqueuedAt: legacy.completionEnqueuedAt,
deliveredAt: legacy.completionDeliveredAt ?? legacy.completionAnnouncedAt,
announcedAt: legacy.completionAnnouncedAt,
lastDropReason: legacy.lastAnnounceDropReason,
};
}
if (legacy.pendingFinalDelivery === true || legacy.pendingFinalDeliveryPayload) {
return {
status: "pending",
payload: legacy.pendingFinalDeliveryPayload,
createdAt: legacy.pendingFinalDeliveryCreatedAt,
lastAttemptAt: legacy.pendingFinalDeliveryLastAttemptAt ?? legacy.lastAnnounceRetryAt,
attemptCount: legacy.pendingFinalDeliveryAttemptCount ?? legacy.announceRetryCount,
lastError: legacy.pendingFinalDeliveryLastError ?? legacy.lastAnnounceDeliveryError ?? null,
enqueuedAt: legacy.completionEnqueuedAt,
deliveredAt: legacy.completionDeliveredAt,
lastDropReason: legacy.lastAnnounceDropReason,
};
}
return {
status: typeof entry.endedAt === "number" ? "pending" : "not_required",
enqueuedAt: legacy.completionEnqueuedAt,
deliveredAt: legacy.completionDeliveredAt,
lastAttemptAt: legacy.lastAnnounceRetryAt,
attemptCount: legacy.announceRetryCount,
lastError: legacy.lastAnnounceDeliveryError ?? null,
lastDropReason: legacy.lastAnnounceDropReason,
};
}
export function ensureCompletionState(entry: SubagentRunRecord): SubagentCompletionState {
entry.completion ??= {
required: entry.expectsCompletionMessage === true,
};
return entry.completion;
}
export function ensureDeliveryState(entry: SubagentRunRecord): SubagentCompletionDeliveryState {
entry.delivery ??= {
status: entry.expectsCompletionMessage === false ? "not_required" : "pending",
};
return entry.delivery;
}
export function clearDeliveryState(entry: SubagentRunRecord): void {
entry.delivery = {
status: entry.expectsCompletionMessage === false ? "not_required" : "pending",
};
}
export function isDeliverySuspended(entry: SubagentRunRecord): boolean {
return entry.delivery?.status === "suspended" && typeof entry.delivery.suspendedAt === "number";
}
export function getDeliveryAttemptCount(entry: SubagentRunRecord): number {
return entry.delivery?.attemptCount ?? 0;
}
export function getDeliveryLastAttemptAt(entry: SubagentRunRecord): number | undefined {
return entry.delivery?.lastAttemptAt;
}
export function getDeliveryLastError(entry: SubagentRunRecord): string | undefined {
const error = entry.delivery?.lastError;
return typeof error === "string" && error.trim() ? error : undefined;
}

View File

@@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import * as sessions from "../config/sessions.js";
import * as gateway from "../gateway/call.js";
import * as sessionUtils from "../gateway/session-utils.fs.js";
import { resolveInternalSessionEffectsTranscriptPath } from "./internal-session-effects.js";
import * as announceDelivery from "./subagent-announce-delivery.js";
import {
recoverOrphanedSubagentSessions,
@@ -178,6 +179,12 @@ describe("subagent-orphan-recovery", () => {
expect(replaceParams.previousRunId).toBe("run-1");
expect(replaceParams.nextRunId).toBe("test-run-id");
expect(replaceParams.fallback).toBe(run);
expect(replaceParams.transcriptFile).toBe(
resolveInternalSessionEffectsTranscriptPath("test-run-id"),
);
expect(replaceParams.transcriptFile).not.toBe(
resolveInternalSessionEffectsTranscriptPath(params.idempotencyKey as string),
);
});
it("skips sessions that are not aborted", async () => {
@@ -532,39 +539,22 @@ describe("subagent-orphan-recovery", () => {
expect(message).toContain("config changes from your previous run were already applied");
});
it("announces recovery-in-progress once when a later retry is attempting resume", async () => {
it("does not send parent-visible recovery-progress announcements on retry", async () => {
mockSingleAbortedSession();
const activeRuns = createActiveRuns(createTestRunRecord());
const notifiedRecoverySessionKeys = new Set<string>();
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
attemptNumber: 2,
maxAttempts: 4,
notifiedRecoverySessionKeys,
});
expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledOnce();
const announcement = requireRecord(
firstCallParam(
vi.mocked(announceDelivery.deliverSubagentAnnouncement).mock.calls,
"recovery announcement",
),
"recovery announcement params",
);
expect(announcement.requesterSessionKey).toBe("agent:main:quietchat:direct:+1234567890");
expect(announcement.triggerMessage).toContain("Automatic recovery is already in progress");
expect(notifiedRecoverySessionKeys).toEqual(new Set(["agent:main:subagent:test-session-1"]));
expect(announceDelivery.deliverSubagentAnnouncement).not.toHaveBeenCalled();
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
attemptNumber: 3,
maxAttempts: 4,
notifiedRecoverySessionKeys,
});
expect(announceDelivery.deliverSubagentAnnouncement).toHaveBeenCalledOnce();
expect(announceDelivery.deliverSubagentAnnouncement).not.toHaveBeenCalled();
});
it("prevents duplicate resume when updateSessionStore fails", async () => {

View File

@@ -1,10 +1,11 @@
/**
* Post-restart orphan recovery for subagent sessions.
* Post-restart interrupted-run resume for subagent sessions.
*
* After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls,
* this module scans for orphaned sessions (those with `abortedLastRun: true`
* this module scans for interrupted sessions (those with `abortedLastRun: true`
* that are still tracked as active in the subagent registry) and sends a
* synthetic resume message to restart their work.
* synthetic resume message to restart their work. Parent notification is handled
* separately by completion delivery after the child reaches a terminal result.
*
* @see https://github.com/openclaw/openclaw/issues/47711
*/
@@ -22,13 +23,7 @@ import { callGateway } from "../gateway/call.js";
import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
import { formatErrorMessage } from "../infra/errors.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { buildAnnounceIdempotencyKey } from "./announce-idempotency.js";
import {
deliverSubagentAnnouncement,
isInternalAnnounceRequesterSession,
loadRequesterSessionEntry,
} from "./subagent-announce-delivery.js";
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
import { resolveInternalSessionEffectsTranscriptPath } from "./internal-session-effects.js";
import {
evaluateSubagentRecoveryGate,
markSubagentRecoveryAttempt,
@@ -40,12 +35,12 @@ import {
} from "./subagent-registry-steer-runtime.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
const log = createSubsystemLogger("subagent-orphan-recovery");
const log = createSubsystemLogger("subagent-interrupted-resume");
/** Delay before attempting recovery to let the gateway finish bootstrapping. */
const DEFAULT_RECOVERY_DELAY_MS = 5_000;
function isRestartAbortedTimeoutRun(
function isLegacyRestartInterruptedTimeout(
runRecord: SubagentRunRecord,
entry: SessionEntry | undefined,
): boolean {
@@ -57,6 +52,21 @@ function isRestartAbortedTimeoutRun(
);
}
function reclassifyLegacyRestartInterruptedRun(runRecord: SubagentRunRecord): void {
const interruptedAt = runRecord.endedAt;
runRecord.execution = {
...runRecord.execution,
status: "interrupted",
interruptedAt,
interruptionReason: "gateway-restart",
endedAt: undefined,
outcome: undefined,
};
runRecord.endedAt = undefined;
runRecord.endedReason = undefined;
runRecord.outcome = undefined;
}
/**
* Build the resume message for an orphaned subagent.
*/
@@ -76,75 +86,6 @@ function buildResumeMessage(task: string, lastHumanMessage?: string): string {
return message;
}
function buildRecoveryProgressPrompt(params: {
task: string;
attemptNumber: number;
maxAttempts: number;
}): string {
const maxTaskLen = 160;
const taskLabel =
params.task.length > maxTaskLen ? `${params.task.slice(0, maxTaskLen)}...` : params.task;
return (
`A spawned subagent task was interrupted by a gateway restart or connection loss. ` +
`Automatic recovery is already in progress for "${taskLabel}" ` +
`(retry ${params.attemptNumber}/${params.maxAttempts}). ` +
`Send one brief update now in your normal voice: say the task was interrupted, ` +
`you are automatically resuming/retrying it, and you will report back when it either continues or truly fails. ` +
`Do not say the task has failed.`
);
}
async function announceRecoveryInProgress(params: {
runRecord: SubagentRunRecord;
attemptNumber: number;
maxAttempts: number;
}): Promise<boolean> {
const requesterSessionKey = params.runRecord.requesterSessionKey?.trim();
if (!requesterSessionKey) {
return false;
}
const requesterOrigin = params.runRecord.requesterOrigin;
const requesterIsSubagent = isInternalAnnounceRequesterSession(requesterSessionKey);
let directOrigin = requesterOrigin;
if (!requesterIsSubagent) {
const { entry } = loadRequesterSessionEntry(requesterSessionKey);
directOrigin = resolveAnnounceOrigin(entry, requesterOrigin);
}
const prompt = buildRecoveryProgressPrompt({
task: params.runRecord.label || params.runRecord.task,
attemptNumber: params.attemptNumber,
maxAttempts: params.maxAttempts,
});
try {
const delivery = await deliverSubagentAnnouncement({
requesterSessionKey,
announceId: `${params.runRecord.runId}:recovery-progress`,
triggerMessage: prompt,
steerMessage: prompt,
summaryLine: params.runRecord.label || params.runRecord.task,
requesterSessionOrigin: requesterOrigin,
requesterOrigin,
completionDirectOrigin: requesterOrigin,
directOrigin,
sourceSessionKey: params.runRecord.childSessionKey,
sourceTool: "subagent_orphan_recovery",
targetRequesterSessionKey: requesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: false,
bestEffortDeliver: true,
directIdempotencyKey: buildAnnounceIdempotencyKey(
`${params.runRecord.runId}:recovery-progress`,
),
});
return delivery.delivered;
} catch {
return false;
}
}
function extractMessageText(msg: unknown): string | undefined {
if (!msg || typeof msg !== "object") {
return undefined;
@@ -187,14 +128,23 @@ async function resumeOrphanedSession(params: {
}
try {
const idempotencyKey = crypto.randomUUID();
const result = await callGateway<{ runId: string }>({
method: "agent",
params: {
message: resumeMessage,
sessionKey: params.sessionKey,
idempotencyKey: crypto.randomUUID(),
idempotencyKey,
deliver: false,
lane: "subagent",
inputProvenance: {
kind: "inter_session",
sourceSessionKey: params.originalRun.requesterSessionKey,
sourceChannel: "internal",
sourceTool: "subagent_interrupted_resume",
},
sessionEffects: "internal",
suppressPromptPersistence: true,
},
timeoutMs: 10_000,
});
@@ -202,6 +152,7 @@ async function resumeOrphanedSession(params: {
previousRunId: params.originalRunId,
nextRunId: result.runId,
fallback: params.originalRun,
transcriptFile: resolveInternalSessionEffectsTranscriptPath(result.runId),
});
if (!remapped) {
log.warn(
@@ -233,12 +184,6 @@ export async function recoverOrphanedSubagentSessions(params: {
getActiveRuns: () => Map<string, SubagentRunRecord>;
/** Persisted across retries so already-resumed sessions are not resumed again. */
resumedSessionKeys?: Set<string>;
/** Human-visible attempt number for this recovery pass. */
attemptNumber?: number;
/** Total recovery attempts before giving up. */
maxAttempts?: number;
/** Persisted across retries so recovery-in-progress notices stay deduped. */
notifiedRecoverySessionKeys?: Set<string>;
}): Promise<{
recovered: number;
failed: number;
@@ -252,9 +197,6 @@ export async function recoverOrphanedSubagentSessions(params: {
failedRuns: [] as Array<{ runId: string; childSessionKey: string; error?: string }>,
};
const resumedSessionKeys = params.resumedSessionKeys ?? new Set<string>();
const attemptNumber = Math.max(1, params.attemptNumber ?? 1);
const maxAttempts = Math.max(attemptNumber, params.maxAttempts ?? attemptNumber);
const notifiedRecoverySessionKeys = params.notifiedRecoverySessionKeys ?? new Set<string>();
const configChangePattern = /openclaw\.json|openclaw gateway restart|config\.patch/i;
try {
@@ -293,13 +235,14 @@ export async function recoverOrphanedSubagentSessions(params: {
continue;
}
// Restart-aborted subagents can be marked ended with a timeout outcome
// before the gateway comes back up to resume them.
if (
typeof runRecord.endedAt === "number" &&
runRecord.endedAt > 0 &&
!isRestartAbortedTimeoutRun(runRecord, entry)
) {
if (isLegacyRestartInterruptedTimeout(runRecord, entry)) {
reclassifyLegacyRestartInterruptedRun(runRecord);
}
// Terminal child outcomes are immutable. Restart resume only applies to
// non-terminal interrupted execution; delivery retry handles terminal
// child results separately.
if (typeof runRecord.endedAt === "number" && runRecord.endedAt > 0) {
result.skipped++;
continue;
}
@@ -371,17 +314,6 @@ export async function recoverOrphanedSubagentSessions(params: {
return typeof text === "string" && configChangePattern.test(text);
});
if (attemptNumber > 1 && !notifiedRecoverySessionKeys.has(childSessionKey)) {
const notified = await announceRecoveryInProgress({
runRecord,
attemptNumber,
maxAttempts,
});
if (notified) {
notifiedRecoverySessionKeys.add(childSessionKey);
}
}
// Resume the session with the original task context.
// We intentionally do NOT clear abortedLastRun before attempting
// the resume — if callGateway fails (e.g. gateway still booting),
@@ -492,16 +424,11 @@ export function scheduleOrphanRecovery(params: {
const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES;
const resumedSessionKeys = new Set<string>();
const notifiedRecoverySessionKeys = new Set<string>();
const attemptRecovery = (attempt: number, delay: number) => {
setTimeout(() => {
void recoverOrphanedSubagentSessions({
...params,
resumedSessionKeys,
attemptNumber: attempt + 1,
maxAttempts: maxRetries + 1,
notifiedRecoverySessionKeys,
})
.then((result) => {
if (result.failed > 0 && attempt < maxRetries) {

View File

@@ -68,7 +68,10 @@ describe("resolveDeferredCleanupDecision", () => {
it("uses retry backoff for completion-message flows once descendants are settled", () => {
const decision = resolveDecision({
entry: makeEntry({ expectsCompletionMessage: true, announceRetryCount: 1 }),
entry: makeEntry({
expectsCompletionMessage: true,
delivery: { status: "pending", attemptCount: 1 },
}),
activeDescendantRuns: 0,
resolveAnnounceRetryDelayMs: (retryCount) => retryCount * 1_000,
});
@@ -78,7 +81,10 @@ describe("resolveDeferredCleanupDecision", () => {
it("uses retry backoff for non-completion flows so cleanup can settle after announce failures", () => {
const decision = resolveDecision({
entry: makeEntry({ expectsCompletionMessage: false, announceRetryCount: 1 }),
entry: makeEntry({
expectsCompletionMessage: false,
delivery: { status: "not_required", attemptCount: 1 },
}),
activeDescendantRuns: 0,
resolveAnnounceRetryDelayMs: (retryCount) => retryCount * 1_000,
});

View File

@@ -1,3 +1,4 @@
import { getDeliveryAttemptCount } from "./subagent-delivery-state.js";
import {
SUBAGENT_ENDED_REASON_COMPLETE,
type SubagentLifecycleEndedReason,
@@ -51,7 +52,7 @@ export function resolveDeferredCleanupDecision(params: {
return { kind: "defer-descendants", delayMs: params.deferDescendantDelayMs };
}
const retryCount = (params.entry.announceRetryCount ?? 0) + 1;
const retryCount = getDeliveryAttemptCount(params.entry) + 1;
const expiryExceeded = isCompletionMessageFlow
? completionHardExpiryExceeded
: endedAgo > params.announceExpiryMs;

View File

@@ -65,8 +65,11 @@ describe("logAnnounceGiveUp", () => {
const logSpy = vi.spyOn(defaultRuntime, "log").mockImplementation(() => {});
const entry = createRunEntry({
endedAt: 4_000,
announceRetryCount: 3,
lastAnnounceDeliveryError: "direct-primary: routed-dispatch-did-not-queue-final",
delivery: {
status: "failed",
attemptCount: 3,
lastError: "direct-primary: routed-dispatch-did-not-queue-final",
},
});
logAnnounceGiveUp(entry, "retry-limit");
@@ -80,7 +83,10 @@ describe("logAnnounceGiveUp", () => {
it("normalizes multiline delivery errors onto one gateway log line", () => {
const logSpy = vi.spyOn(defaultRuntime, "log").mockImplementation(() => {});
const entry = createRunEntry({
lastAnnounceDeliveryError: "gateway timeout\nphase: routed dispatch failed",
delivery: {
status: "failed",
lastError: "gateway timeout\nphase: routed dispatch failed",
},
});
logAnnounceGiveUp(entry, "expiry");

View File

@@ -13,6 +13,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
import { defaultRuntime } from "../runtime.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { withSubagentOutcomeTiming } from "./subagent-announce-output.js";
import { getDeliveryAttemptCount, getDeliveryLastError } from "./subagent-delivery-state.js";
import { SUBAGENT_ENDED_REASON_ERROR } from "./subagent-lifecycle-events.js";
import { shouldUpdateRunOutcome } from "./subagent-registry-completion.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
@@ -71,12 +72,13 @@ function formatAnnounceGiveUpLogField(value: string): string {
}
export function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") {
const retryCount = entry.announceRetryCount ?? 0;
const retryCount = getDeliveryAttemptCount(entry);
const endedAgoMs =
typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined;
const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a";
const deliveryError = entry.lastAnnounceDeliveryError?.trim()
? ` deliveryError=${formatAnnounceGiveUpLogField(entry.lastAnnounceDeliveryError)}`
const lastDeliveryError = getDeliveryLastError(entry);
const deliveryError = lastDeliveryError
? ` deliveryError=${formatAnnounceGiveUpLogField(lastDeliveryError)}`
: "";
defaultRuntime.log(
`[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}${deliveryError}`,

View File

@@ -573,10 +573,10 @@ describe("subagent registry lifecycle hardening", () => {
}),
).resolves.toBeUndefined();
await vi.waitFor(() => expect(entry.completionAnnouncedAt).toBe(12_300));
expect(entry.completionEnqueuedAt).toBe(4_100);
expect(entry.completionDeliveredAt).toBe(12_300);
expect(entry.lastAnnounceDropReason).toBeUndefined();
await vi.waitFor(() => expect(entry.delivery?.announcedAt).toBe(12_300));
expect(entry.delivery?.enqueuedAt).toBe(4_100);
expect(entry.delivery?.deliveredAt).toBe(12_300);
expect(entry.delivery?.lastDropReason).toBeUndefined();
expectFields(firstCallArg(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId), {
runId: entry.runId,
deliveryStatus: "delivered",
@@ -611,7 +611,7 @@ describe("subagent registry lifecycle hardening", () => {
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(hasDeliveredTaskStatusUpdate(entry.runId)).toBe(false);
await vi.waitFor(() => expect(entry.cleanupCompletedAt).toBeTypeOf("number"));
expect(entry.completionAnnouncedAt).toBeUndefined();
expect(entry.delivery?.announcedAt).toBeUndefined();
});
it("archives delete-mode sessions when completion messages are disabled", async () => {
@@ -655,7 +655,7 @@ describe("subagent registry lifecycle hardening", () => {
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(hasDeliveredTaskStatusUpdate(entry.runId)).toBe(false);
await vi.waitFor(() => expect(runs.has(entry.runId)).toBe(false));
expect(entry.completionAnnouncedAt).toBeUndefined();
expect(entry.delivery?.announcedAt).toBeUndefined();
});
it("retires bundle MCP runtimes when run-mode cleanup completes", async () => {
@@ -832,7 +832,7 @@ describe("subagent registry lifecycle hardening", () => {
).resolves.toBeUndefined();
expect(captureSubagentCompletionReply).not.toHaveBeenCalled();
expect(entry.frozenResultText).toBeNull();
expect(entry.completion?.resultText).toBeNull();
expectFields(firstCallArg(taskExecutorMocks.failTaskRunByRunId), {
status: "failed",
error: "All models failed (2): timeout",
@@ -843,7 +843,7 @@ describe("subagent registry lifecycle hardening", () => {
it("does not re-run announce flow after completion was already delivered", async () => {
const entry = createRunEntry({
completionAnnouncedAt: 3_500,
delivery: { status: "delivered", announcedAt: 3_500, deliveredAt: 3_500 },
endedAt: 4_000,
});
const persist = vi.fn();
@@ -880,7 +880,7 @@ describe("subagent registry lifecycle hardening", () => {
it("emits ended hook while retrying cleanup after completion was already delivered", async () => {
const entry = createRunEntry({
completionAnnouncedAt: 3_500,
delivery: { status: "delivered", announcedAt: 3_500, deliveredAt: 3_500 },
endedAt: 4_000,
expectsCompletionMessage: true,
});
@@ -924,7 +924,7 @@ describe("subagent registry lifecycle hardening", () => {
captureSubagentCompletionReply: vi.fn(async () => undefined),
});
expect(entry.completionAnnouncedAt).toBeUndefined();
expect(entry.delivery?.announcedAt).toBeUndefined();
await controller.finalizeResumedAnnounceGiveUp({
runId: entry.runId,
@@ -942,8 +942,8 @@ describe("subagent registry lifecycle hardening", () => {
endedAt: 4_000,
endedReason: SUBAGENT_ENDED_REASON_COMPLETE,
expectsCompletionMessage: true,
frozenResultText: "final answer",
lastAnnounceDeliveryError: "gateway request timeout for agent",
completion: { required: true, resultText: "final answer" },
delivery: { status: "pending", lastError: "gateway request timeout for agent" },
outcome: { status: "ok" },
retainAttachmentsOnKeep: true,
});
@@ -960,15 +960,15 @@ describe("subagent registry lifecycle hardening", () => {
reason: "retry-limit",
});
expect(entry.pendingFinalDelivery).toBe(true);
expect(entry.pendingFinalDeliveryPayload).toMatchObject({
expect(entry.delivery?.status).toBe("suspended");
expect(entry.delivery?.payload).toMatchObject({
requesterSessionKey: entry.requesterSessionKey,
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
frozenResultText: "final answer",
});
expect(entry.deliverySuspendedAt).toBeTypeOf("number");
expect(entry.deliverySuspendedReason).toBe("retry-limit");
expect(entry.delivery?.suspendedAt).toBeTypeOf("number");
expect(entry.delivery?.suspendedReason).toBe("retry-limit");
expect(entry.cleanupHandled).toBe(false);
expect(entry.cleanupCompletedAt).toBeUndefined();
expect(helperMocks.safeRemoveAttachmentsDir).not.toHaveBeenCalled();
@@ -1015,7 +1015,7 @@ describe("subagent registry lifecycle hardening", () => {
endedAt: 4_000,
endedReason,
expectsCompletionMessage: true,
lastAnnounceDeliveryError: "gateway request timeout for agent",
delivery: { status: "pending", lastError: "gateway request timeout for agent" },
outcome,
retainAttachmentsOnKeep: true,
});
@@ -1032,10 +1032,9 @@ describe("subagent registry lifecycle hardening", () => {
reason: "retry-limit",
});
expect(entry.pendingFinalDelivery).toBeUndefined();
expect(entry.pendingFinalDeliveryPayload).toBeUndefined();
expect(entry.deliverySuspendedAt).toBeUndefined();
expect(entry.deliverySuspendedReason).toBeUndefined();
expect(entry.delivery?.payload).toBeUndefined();
expect(entry.delivery?.suspendedAt).toBeUndefined();
expect(entry.delivery?.suspendedReason).toBeUndefined();
expect(entry.cleanupCompletedAt).toBeTypeOf("number");
expect(persist).toHaveBeenCalled();
},
@@ -1152,17 +1151,17 @@ describe("subagent registry lifecycle hardening", () => {
error:
"UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed",
});
expect(entry.lastAnnounceDeliveryError).toBe(
expect(entry.delivery?.lastError).toBe(
"UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed",
);
expect(entry.pendingFinalDelivery).toBe(true);
expect(entry.pendingFinalDeliveryPayload).toMatchObject({
expect(entry.delivery?.status).toBe("suspended");
expect(entry.delivery?.payload).toMatchObject({
requesterSessionKey: entry.requesterSessionKey,
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
});
expect(entry.deliverySuspendedAt).toBeTypeOf("number");
expect(entry.deliverySuspendedReason).toBe("retry-limit");
expect(entry.delivery?.suspendedAt).toBeTypeOf("number");
expect(entry.delivery?.suspendedReason).toBe("retry-limit");
expect(entry.cleanupCompletedAt).toBeUndefined();
expectFields(
findCallArg(
@@ -1190,11 +1189,11 @@ describe("subagent registry lifecycle hardening", () => {
params: { sessionKey: entry.requesterSessionKey, limit: 25, maxChars: 128 * 1024 },
timeoutMs: 5_000,
});
expect(entry.completionDeliveredAt).toBe(12_345);
expect(entry.completionAnnouncedAt).toBe(12_345);
expect(entry.lastAnnounceDeliveryError).toBeUndefined();
expect(entry.pendingFinalDelivery).toBeUndefined();
expect(entry.announceRetryCount).toBeUndefined();
expect(entry.delivery?.deliveredAt).toBe(12_345);
expect(entry.delivery?.announcedAt).toBe(12_345);
expect(entry.delivery?.lastError).toBeUndefined();
expect(entry.delivery?.payload).toBeUndefined();
expect(entry.delivery?.attemptCount).toBeUndefined();
expect(hasDeliveredTaskStatusUpdate(entry.runId)).toBe(true);
expect(helperMocks.logAnnounceGiveUp).not.toHaveBeenCalled();
@@ -1206,7 +1205,7 @@ describe("subagent registry lifecycle hardening", () => {
});
await vi.waitFor(() => expect(longMirrorEntry.cleanupCompletedAt).toBeTypeOf("number"));
expect(longMirrorEntry.completionDeliveredAt).toBe(12_345);
expect(longMirrorEntry.delivery?.deliveredAt).toBe(12_345);
expect(gatewayMocks.callGateway).toHaveBeenCalledWith({
method: "chat.history",
params: { sessionKey: longMirrorEntry.requesterSessionKey, limit: 25, maxChars: 128 * 1024 },
@@ -1224,7 +1223,7 @@ describe("subagent registry lifecycle hardening", () => {
await vi.waitFor(() =>
expect(messageToolAnnounceEntry.cleanupCompletedAt).toBeTypeOf("number"),
);
expect(messageToolAnnounceEntry.completionDeliveredAt).toBe(12_345);
expect(messageToolAnnounceEntry.delivery?.deliveredAt).toBe(12_345);
vi.clearAllMocks();
gatewayMocks.callGateway.mockResolvedValue({});
@@ -1234,19 +1233,17 @@ describe("subagent registry lifecycle hardening", () => {
});
await vi.waitFor(() => expect(childRunMirrorEntry.cleanupCompletedAt).toBeTypeOf("number"));
expect(childRunMirrorEntry.completionDeliveredAt).toBe(12_345);
expect(childRunMirrorEntry.delivery?.deliveredAt).toBe(12_345);
vi.clearAllMocks();
taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId.mockReset();
gatewayMocks.callGateway.mockResolvedValue({});
const staleEntry = await runNoReplyMirrorScenario({ timestamp: 1_999 });
await vi.waitFor(() => expect(staleEntry.deliverySuspendedAt).toBeTypeOf("number"));
expect(staleEntry.completionDeliveredAt).toBeUndefined();
expect(staleEntry.completionAnnouncedAt).toBeUndefined();
expect(staleEntry.lastAnnounceDeliveryError).toBe(
"completion agent did not produce a visible reply",
);
await vi.waitFor(() => expect(staleEntry.delivery?.suspendedAt).toBeTypeOf("number"));
expect(staleEntry.delivery?.deliveredAt).toBeUndefined();
expect(staleEntry.delivery?.announcedAt).toBeUndefined();
expect(staleEntry.delivery?.lastError).toBe("completion agent did not produce a visible reply");
expect(hasDeliveredTaskStatusUpdate(staleEntry.runId)).toBe(false);
expectFields(firstCallArg(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId), {
runId: staleEntry.runId,
@@ -1276,10 +1273,12 @@ describe("subagent registry lifecycle hardening", () => {
)}:internal-source-reply:0`,
});
await vi.waitFor(() => expect(sameWindowSiblingEntry.deliverySuspendedAt).toBeTypeOf("number"));
expect(sameWindowSiblingEntry.completionDeliveredAt).toBeUndefined();
expect(sameWindowSiblingEntry.completionAnnouncedAt).toBeUndefined();
expect(sameWindowSiblingEntry.lastAnnounceDeliveryError).toBe(
await vi.waitFor(() =>
expect(sameWindowSiblingEntry.delivery?.suspendedAt).toBeTypeOf("number"),
);
expect(sameWindowSiblingEntry.delivery?.deliveredAt).toBeUndefined();
expect(sameWindowSiblingEntry.delivery?.announcedAt).toBeUndefined();
expect(sameWindowSiblingEntry.delivery?.lastError).toBe(
"completion agent did not produce a visible reply",
);
expect(hasDeliveredTaskStatusUpdate(sameWindowSiblingEntry.runId)).toBe(false);

View File

@@ -20,9 +20,17 @@ import {
buildAnnounceIdFromChildRun,
buildAnnounceIdempotencyKey,
} from "./announce-idempotency.js";
import { removeInternalSessionEffectsTranscript } from "./internal-session-effects.js";
import { retireSessionMcpRuntimeForSessionKey } from "./pi-bundle-mcp-tools.js";
import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js";
import { type SubagentRunOutcome, withSubagentOutcomeTiming } from "./subagent-announce-output.js";
import {
clearDeliveryState,
ensureCompletionState,
ensureDeliveryState,
getDeliveryLastError,
isDeliverySuspended,
} from "./subagent-delivery-state.js";
import {
SUBAGENT_ENDED_REASON_COMPLETE,
type SubagentLifecycleEndedReason,
@@ -162,19 +170,21 @@ export function createSubagentRegistryLifecycleController(params: {
entry: SubagentRunRecord,
delivery: SubagentAnnounceDeliveryResult,
) => {
const deliveryState = ensureDeliveryState(entry);
if (typeof delivery.enqueuedAt === "number") {
entry.completionEnqueuedAt ??= delivery.enqueuedAt;
deliveryState.enqueuedAt ??= delivery.enqueuedAt;
}
if (delivery.delivered) {
const deliveredAt =
typeof delivery.deliveredAt === "number" ? delivery.deliveredAt : Date.now();
entry.completionDeliveredAt = deliveredAt;
entry.lastAnnounceDropReason = undefined;
deliveryState.deliveredAt = deliveredAt;
deliveryState.lastDropReason = undefined;
}
};
const hasPriorRequesterDeliveryMirror = async (entry: SubagentRunRecord): Promise<boolean> => {
const expectedText = extractTextFromChatContent(entry.frozenResultText, { joinWith: "" });
const completion = ensureCompletionState(entry);
const expectedText = extractTextFromChatContent(completion.resultText, { joinWith: "" });
if (entry.expectsCompletionMessage !== true || expectedText == null) {
return false;
}
@@ -229,7 +239,7 @@ export function createSubagentRegistryLifecycleController(params: {
);
});
if (mirror) {
entry.completionDeliveredAt = (mirror as { timestamp: number }).timestamp;
ensureDeliveryState(entry).deliveredAt = (mirror as { timestamp: number }).timestamp;
}
return Boolean(mirror);
} catch {
@@ -269,9 +279,10 @@ export function createSubagentRegistryLifecycleController(params: {
const lastEventAt = endedAt;
try {
if (args.outcome.status === "ok") {
const completion = ensureCompletionState(args.entry);
const terminalResult =
args.entry.expectsCompletionMessage === true
? resolveRequiredCompletionTerminalResult(args.entry.frozenResultText)
? resolveRequiredCompletionTerminalResult(completion.resultText)
: {};
completeTaskRunByRunId({
runId: args.entry.runId,
@@ -279,7 +290,7 @@ export function createSubagentRegistryLifecycleController(params: {
sessionKey: args.entry.childSessionKey,
endedAt,
lastEventAt,
progressSummary: args.entry.frozenResultText ?? undefined,
progressSummary: completion.resultText ?? undefined,
terminalSummary: terminalResult.terminalSummary ?? null,
terminalOutcome: terminalResult.terminalOutcome,
});
@@ -293,7 +304,7 @@ export function createSubagentRegistryLifecycleController(params: {
endedAt,
lastEventAt,
error: args.outcome.status === "error" ? args.outcome.error : undefined,
progressSummary: args.entry.frozenResultText ?? undefined,
progressSummary: ensureCompletionState(args.entry).resultText ?? undefined,
terminalSummary: null,
});
} catch (err) {
@@ -322,7 +333,7 @@ export function createSubagentRegistryLifecycleController(params: {
sessionKey: args.entry.childSessionKey,
endedAt,
lastEventAt: Date.now(),
progressSummary: args.entry.frozenResultText ?? undefined,
progressSummary: ensureCompletionState(args.entry).resultText ?? undefined,
terminalSummary: terminalResult.terminalSummary,
terminalOutcome: terminalResult.terminalOutcome,
});
@@ -339,24 +350,26 @@ export function createSubagentRegistryLifecycleController(params: {
entry: SubagentRunRecord,
outcome: SubagentRunOutcome,
): Promise<boolean> => {
if (entry.frozenResultText !== undefined) {
const completion = ensureCompletionState(entry);
if (completion.resultText !== undefined) {
return false;
}
if (outcome.status === "error") {
entry.frozenResultText = null;
entry.frozenResultCapturedAt = Date.now();
completion.resultText = null;
completion.capturedAt = Date.now();
return true;
}
try {
const captured = await params.captureSubagentCompletionReply(entry.childSessionKey, {
waitForReply: entry.expectsCompletionMessage === true,
outcome,
sessionFile: entry.execution?.transcriptFile,
});
entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null;
completion.resultText = captured?.trim() ? capFrozenResultText(captured) : null;
} catch {
entry.frozenResultText = null;
completion.resultText = null;
}
entry.frozenResultCapturedAt = Date.now();
completion.capturedAt = Date.now();
return true;
};
@@ -407,14 +420,16 @@ export function createSubagentRegistryLifecycleController(params: {
const capturedAt = Date.now();
let changed = false;
for (const entry of candidates) {
if (entry.frozenResultText === nextFrozen) {
const completion = ensureCompletionState(entry);
if (completion.resultText === nextFrozen) {
continue;
}
entry.frozenResultText = nextFrozen;
entry.frozenResultCapturedAt = capturedAt;
if (entry.pendingFinalDeliveryPayload) {
entry.pendingFinalDeliveryPayload = {
...entry.pendingFinalDeliveryPayload,
completion.resultText = nextFrozen;
completion.capturedAt = capturedAt;
const delivery = entry.delivery;
if (delivery?.payload) {
delivery.payload = {
...delivery.payload,
frozenResultText: nextFrozen,
};
}
@@ -446,12 +461,17 @@ export function createSubagentRegistryLifecycleController(params: {
};
const clearPendingFinalDelivery = (entry: SubagentRunRecord) => {
entry.pendingFinalDelivery = undefined;
entry.pendingFinalDeliveryCreatedAt = undefined;
entry.pendingFinalDeliveryLastAttemptAt = undefined;
entry.pendingFinalDeliveryAttemptCount = undefined;
entry.pendingFinalDeliveryLastError = undefined;
entry.pendingFinalDeliveryPayload = undefined;
const delivery = ensureDeliveryState(entry);
delivery.payload = undefined;
delivery.createdAt = undefined;
delivery.lastAttemptAt = undefined;
delivery.attemptCount = undefined;
delivery.lastError = undefined;
delivery.suspendedAt = undefined;
delivery.suspendedReason = undefined;
if (delivery.status !== "delivered" && delivery.status !== "failed") {
clearDeliveryState(entry);
}
};
const loadPendingFinalDeliveryPayload = (
@@ -459,28 +479,25 @@ export function createSubagentRegistryLifecycleController(params: {
): PendingFinalDeliveryPayload => {
return {
requesterSessionKey:
entry.pendingFinalDeliveryPayload?.requesterSessionKey ?? entry.requesterSessionKey,
requesterOrigin: entry.pendingFinalDeliveryPayload?.requesterOrigin ?? entry.requesterOrigin,
entry.delivery?.payload?.requesterSessionKey ?? entry.requesterSessionKey,
requesterOrigin: entry.delivery?.payload?.requesterOrigin ?? entry.requesterOrigin,
requesterDisplayKey:
entry.pendingFinalDeliveryPayload?.requesterDisplayKey ?? entry.requesterDisplayKey,
childSessionKey: entry.pendingFinalDeliveryPayload?.childSessionKey ?? entry.childSessionKey,
childRunId: entry.pendingFinalDeliveryPayload?.childRunId ?? entry.runId,
task: entry.pendingFinalDeliveryPayload?.task ?? entry.task,
label: entry.pendingFinalDeliveryPayload?.label ?? entry.label,
startedAt: entry.pendingFinalDeliveryPayload?.startedAt ?? entry.startedAt,
endedAt: entry.pendingFinalDeliveryPayload?.endedAt ?? entry.endedAt,
outcome: entry.pendingFinalDeliveryPayload?.outcome ?? entry.outcome,
entry.delivery?.payload?.requesterDisplayKey ?? entry.requesterDisplayKey,
childSessionKey: entry.delivery?.payload?.childSessionKey ?? entry.childSessionKey,
childRunId: entry.delivery?.payload?.childRunId ?? entry.runId,
task: entry.delivery?.payload?.task ?? entry.task,
label: entry.delivery?.payload?.label ?? entry.label,
startedAt: entry.delivery?.payload?.startedAt ?? entry.startedAt,
endedAt: entry.delivery?.payload?.endedAt ?? entry.endedAt,
outcome: entry.delivery?.payload?.outcome ?? entry.outcome,
expectsCompletionMessage:
entry.pendingFinalDeliveryPayload?.expectsCompletionMessage ??
entry.expectsCompletionMessage,
spawnMode: entry.pendingFinalDeliveryPayload?.spawnMode ?? entry.spawnMode,
frozenResultText:
entry.pendingFinalDeliveryPayload?.frozenResultText ?? entry.frozenResultText,
entry.delivery?.payload?.expectsCompletionMessage ?? entry.expectsCompletionMessage,
spawnMode: entry.delivery?.payload?.spawnMode ?? entry.spawnMode,
frozenResultText: entry.delivery?.payload?.frozenResultText ?? entry.completion?.resultText,
fallbackFrozenResultText:
entry.pendingFinalDeliveryPayload?.fallbackFrozenResultText ??
entry.fallbackFrozenResultText,
entry.delivery?.payload?.fallbackFrozenResultText ?? entry.completion?.fallbackResultText,
wakeOnDescendantSettle:
entry.pendingFinalDeliveryPayload?.wakeOnDescendantSettle ?? entry.wakeOnDescendantSettle,
entry.delivery?.payload?.wakeOnDescendantSettle ?? entry.wakeOnDescendantSettle,
};
};
@@ -488,13 +505,13 @@ export function createSubagentRegistryLifecycleController(params: {
const now = Date.now();
const payload: PendingFinalDeliveryPayload = loadPendingFinalDeliveryPayload(args.entry);
args.entry.pendingFinalDelivery = true;
args.entry.pendingFinalDeliveryCreatedAt ??= now;
args.entry.pendingFinalDeliveryLastAttemptAt = now;
args.entry.pendingFinalDeliveryAttemptCount =
(args.entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
args.entry.pendingFinalDeliveryLastError = args.error ?? null;
args.entry.pendingFinalDeliveryPayload = payload;
const delivery = ensureDeliveryState(args.entry);
delivery.status = "pending";
delivery.createdAt ??= now;
delivery.lastAttemptAt = now;
delivery.attemptCount = (delivery.attemptCount ?? 0) + 1;
delivery.lastError = args.error ?? null;
delivery.payload = payload;
};
const suspendPendingFinalDelivery = (args: {
@@ -505,25 +522,28 @@ export function createSubagentRegistryLifecycleController(params: {
}) => {
markPendingFinalDelivery({
entry: args.entry,
error: args.error ?? args.entry.lastAnnounceDeliveryError ?? args.reason,
error: args.error ?? getDeliveryLastError(args.entry) ?? args.reason,
});
const now = Date.now();
args.entry.deliverySuspendedAt ??= now;
args.entry.deliverySuspendedReason = args.reason;
const delivery = ensureDeliveryState(args.entry);
delivery.status = "suspended";
delivery.suspendedAt ??= now;
delivery.suspendedReason = args.reason;
args.entry.cleanupHandled = false;
args.entry.wakeOnDescendantSettle = undefined;
args.entry.fallbackFrozenResultText = undefined;
args.entry.fallbackFrozenResultCapturedAt = undefined;
const completion = ensureCompletionState(args.entry);
completion.fallbackResultText = undefined;
completion.fallbackCapturedAt = undefined;
params.resumedRuns.delete(args.runId);
safeSetSubagentTaskDeliveryStatus({
runId: args.runId,
childSessionKey: args.entry.childSessionKey,
deliveryStatus: "failed",
deliveryError: args.entry.lastAnnounceDeliveryError ?? args.reason,
deliveryError: getDeliveryLastError(args.entry) ?? args.reason,
});
safeMarkRequiredCompletionDeliveryBlocked({
entry: args.entry,
reason: args.entry.lastAnnounceDeliveryError ?? args.reason,
reason: getDeliveryLastError(args.entry) ?? args.reason,
});
logAnnounceGiveUp(args.entry, args.reason);
params.persist();
@@ -545,24 +565,29 @@ export function createSubagentRegistryLifecycleController(params: {
runId: giveUpParams.runId,
entry: giveUpParams.entry,
reason: giveUpParams.reason,
error: giveUpParams.entry.lastAnnounceDeliveryError,
error: getDeliveryLastError(giveUpParams.entry),
});
return;
}
const deliveryError = getDeliveryLastError(giveUpParams.entry) ?? giveUpParams.reason;
clearPendingFinalDelivery(giveUpParams.entry);
const failedDelivery = ensureDeliveryState(giveUpParams.entry);
failedDelivery.status = "failed";
failedDelivery.lastError = deliveryError;
safeSetSubagentTaskDeliveryStatus({
runId: giveUpParams.runId,
childSessionKey: giveUpParams.entry.childSessionKey,
deliveryStatus: "failed",
deliveryError: giveUpParams.entry.lastAnnounceDeliveryError,
deliveryError,
});
safeMarkRequiredCompletionDeliveryBlocked({
entry: giveUpParams.entry,
reason: giveUpParams.entry.lastAnnounceDeliveryError ?? giveUpParams.reason,
reason: deliveryError,
});
giveUpParams.entry.wakeOnDescendantSettle = undefined;
giveUpParams.entry.fallbackFrozenResultText = undefined;
giveUpParams.entry.fallbackFrozenResultCapturedAt = undefined;
const completion = ensureCompletionState(giveUpParams.entry);
completion.fallbackResultText = undefined;
completion.fallbackCapturedAt = undefined;
const shouldDeleteAttachments =
giveUpParams.entry.cleanup === "delete" || !giveUpParams.entry.retainAttachmentsOnKeep;
if (shouldDeleteAttachments) {
@@ -606,7 +631,7 @@ export function createSubagentRegistryLifecycleController(params: {
if (entry.cleanupCompletedAt || entry.cleanupHandled) {
continue;
}
if (entry.pendingFinalDelivery === true && typeof entry.deliverySuspendedAt === "number") {
if (isDeliverySuspended(entry)) {
continue;
}
if (params.suppressAnnounceForSteerRestart(entry)) {
@@ -645,6 +670,7 @@ export function createSubagentRegistryLifecycleController(params: {
cleanup: "delete" | "keep";
completedAt: number;
}) => {
void removeInternalSessionEffectsTranscript(cleanupParams.entry.execution?.transcriptFile);
if (cleanupParams.entry.spawnMode !== "session") {
void retireSessionMcpRuntimeForSessionKey({
sessionKey: cleanupParams.entry.childSessionKey,
@@ -720,14 +746,18 @@ export function createSubagentRegistryLifecycleController(params: {
}
if (didAnnounce) {
if (!options?.skipAnnounce) {
const deliveredAt = entry.completionDeliveredAt ?? Date.now();
entry.completionDeliveredAt = deliveredAt;
entry.completionAnnouncedAt = deliveredAt;
const delivery = ensureDeliveryState(entry);
const deliveredAt = delivery.deliveredAt ?? Date.now();
delivery.status = "delivered";
delivery.deliveredAt = deliveredAt;
delivery.announcedAt = deliveredAt;
params.persist();
}
clearPendingFinalDelivery(entry);
entry.deliverySuspendedAt = undefined;
entry.deliverySuspendedReason = undefined;
const delivery = ensureDeliveryState(entry);
delivery.status = "delivered";
delivery.suspendedAt = undefined;
delivery.suspendedReason = undefined;
if (!options?.skipDeliveryStatus) {
safeSetSubagentTaskDeliveryStatus({
runId,
@@ -735,11 +765,12 @@ export function createSubagentRegistryLifecycleController(params: {
deliveryStatus: "delivered",
});
}
entry.lastAnnounceDeliveryError = undefined;
entry.lastAnnounceDropReason = undefined;
delivery.lastError = undefined;
delivery.lastDropReason = undefined;
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
entry.fallbackFrozenResultCapturedAt = undefined;
const completion = ensureCompletionState(entry);
completion.fallbackResultText = undefined;
completion.fallbackCapturedAt = undefined;
const completionReason = resolveCleanupCompletionReason(entry);
await emitCompletionEndedHookIfNeeded(entry, completionReason);
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
@@ -747,8 +778,8 @@ export function createSubagentRegistryLifecycleController(params: {
await safeRemoveAttachmentsDir(entry);
}
if (cleanup === "delete") {
entry.frozenResultText = undefined;
entry.frozenResultCapturedAt = undefined;
completion.resultText = undefined;
completion.capturedAt = undefined;
}
completeCleanupBookkeeping({
runId,
@@ -772,7 +803,7 @@ export function createSubagentRegistryLifecycleController(params: {
});
if (deferredDecision.kind === "defer-descendants") {
entry.lastAnnounceRetryAt = now;
ensureDeliveryState(entry).lastAttemptAt = now;
entry.wakeOnDescendantSettle = true;
entry.cleanupHandled = false;
params.resumedRuns.delete(runId);
@@ -781,35 +812,39 @@ export function createSubagentRegistryLifecycleController(params: {
return;
}
if (deferredDecision.retryCount != null) {
entry.announceRetryCount = deferredDecision.retryCount;
entry.lastAnnounceRetryAt = now;
}
if (deferredDecision.kind === "give-up") {
if (shouldSuspendPendingFinalDelivery(entry)) {
suspendPendingFinalDelivery({
runId,
entry,
reason: deferredDecision.reason,
error: entry.lastAnnounceDeliveryError,
error: getDeliveryLastError(entry),
});
return;
}
const deliveryError = getDeliveryLastError(entry) ?? deferredDecision.reason;
clearPendingFinalDelivery(entry);
const failedDelivery = ensureDeliveryState(entry);
failedDelivery.status = "failed";
failedDelivery.lastError = deliveryError;
if (deferredDecision.retryCount != null) {
failedDelivery.attemptCount = deferredDecision.retryCount;
failedDelivery.lastAttemptAt = now;
}
safeSetSubagentTaskDeliveryStatus({
runId,
childSessionKey: entry.childSessionKey,
deliveryStatus: "failed",
deliveryError: entry.lastAnnounceDeliveryError,
deliveryError,
});
safeMarkRequiredCompletionDeliveryBlocked({
entry,
reason: entry.lastAnnounceDeliveryError ?? deferredDecision.reason,
reason: deliveryError,
});
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
entry.fallbackFrozenResultCapturedAt = undefined;
const completion = ensureCompletionState(entry);
completion.fallbackResultText = undefined;
completion.fallbackCapturedAt = undefined;
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
if (shouldDeleteAttachments) {
await safeRemoveAttachmentsDir(entry);
@@ -842,7 +877,7 @@ export function createSubagentRegistryLifecycleController(params: {
};
const startSubagentAnnounceCleanupFlow = (runId: string, entry: SubagentRunRecord): boolean => {
if (typeof entry.completionAnnouncedAt === "number") {
if (typeof entry.delivery?.announcedAt === "number" || entry.delivery?.status === "delivered") {
if (!beginSubagentCleanup(runId)) {
return false;
}
@@ -894,7 +929,7 @@ export function createSubagentRegistryLifecycleController(params: {
}
const pendingPayload = loadPendingFinalDeliveryPayload(entry);
const requesterOrigin = normalizeDeliveryContext(pendingPayload.requesterOrigin);
let latestDeliveryError = entry.lastAnnounceDeliveryError;
let latestDeliveryError = getDeliveryLastError(entry);
const finalizeAnnounceCleanup = async (didAnnounce: boolean) => {
const shouldCreditPriorDelivery =
!didAnnounce && (await hasPriorRequesterDeliveryMirror(entry));
@@ -902,7 +937,7 @@ export function createSubagentRegistryLifecycleController(params: {
latestDeliveryError = undefined;
}
if (!didAnnounce && latestDeliveryError) {
entry.lastAnnounceDeliveryError = latestDeliveryError;
ensureDeliveryState(entry).lastError = latestDeliveryError;
}
void finalizeSubagentCleanup(
runId,
@@ -942,19 +977,20 @@ export function createSubagentRegistryLifecycleController(params: {
onDeliveryResult: (delivery) => {
recordAnnounceDeliveryResult(entry, delivery);
if (delivery.delivered) {
if (entry.lastAnnounceDeliveryError !== undefined) {
entry.lastAnnounceDeliveryError = undefined;
const deliveryState = ensureDeliveryState(entry);
if (deliveryState.lastError !== undefined) {
deliveryState.lastError = undefined;
params.persist();
}
latestDeliveryError = undefined;
return;
}
if (delivery.path === "none") {
entry.lastAnnounceDropReason = "sink_unavailable";
ensureDeliveryState(entry).lastDropReason = "sink_unavailable";
}
latestDeliveryError = formatAnnounceDeliveryError(delivery);
if (entry.lastAnnounceDeliveryError !== latestDeliveryError) {
entry.lastAnnounceDeliveryError = latestDeliveryError;
if (ensureDeliveryState(entry).lastError !== latestDeliveryError) {
ensureDeliveryState(entry).lastError = latestDeliveryError;
params.persist();
}
},
@@ -995,7 +1031,7 @@ export function createSubagentRegistryLifecycleController(params: {
entry.suppressAnnounceReason = undefined;
entry.cleanupHandled = false;
entry.cleanupCompletedAt = undefined;
entry.completionAnnouncedAt = undefined;
ensureDeliveryState(entry).announcedAt = undefined;
mutated = true;
}
@@ -1003,6 +1039,12 @@ export function createSubagentRegistryLifecycleController(params: {
typeof completeParams.endedAt === "number" ? completeParams.endedAt : Date.now();
if (entry.endedAt !== endedAt) {
entry.endedAt = endedAt;
entry.execution = {
...entry.execution,
status: "terminal",
startedAt: entry.startedAt,
endedAt,
};
mutated = true;
}
const outcome = withSubagentOutcomeTiming(completeParams.outcome, {
@@ -1013,6 +1055,20 @@ export function createSubagentRegistryLifecycleController(params: {
entry.outcome = outcome;
mutated = true;
}
if (
entry.execution?.status !== "terminal" ||
entry.execution.endedAt !== endedAt ||
entry.execution.outcome !== outcome
) {
entry.execution = {
...entry.execution,
status: "terminal",
startedAt: entry.startedAt,
endedAt,
outcome,
};
mutated = true;
}
if (entry.endedReason !== completeParams.reason) {
entry.endedReason = completeParams.reason;
mutated = true;

View File

@@ -1,4 +1,5 @@
import { registerSessionMaintenancePreserveKeysProvider } from "../config/sessions/store-maintenance-preserve.js";
import { isDeliverySuspended } from "./subagent-delivery-state.js";
import { subagentRuns } from "./subagent-registry-memory.js";
import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
@@ -12,11 +13,11 @@ function isActiveForMaintenance(entry: SubagentRunRecord): boolean {
}
function isPendingFinalDeliveryForMaintenance(entry: SubagentRunRecord): boolean {
return entry.pendingFinalDelivery === true;
return entry.delivery?.status === "pending" || isDeliverySuspended(entry);
}
function isAwaitingCompletionAnnounceForMaintenance(entry: SubagentRunRecord): boolean {
return entry.expectsCompletionMessage === true && typeof entry.completionAnnouncedAt !== "number";
return entry.expectsCompletionMessage === true && entry.delivery?.status !== "delivered";
}
function shouldPreserveForMaintenance(entry: SubagentRunRecord): boolean {

View File

@@ -7,9 +7,11 @@ import { formatBlockedLivenessError, isBlockedLivenessState } from "../shared/ag
import { createRunningTaskRun } from "../tasks/detached-task-runtime.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import { removeInternalSessionEffectsTranscript } from "./internal-session-effects.js";
import { isRecoverableAgentWaitError, waitForAgentRun } from "./run-wait.js";
import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js";
import { type SubagentRunOutcome, withSubagentOutcomeTiming } from "./subagent-announce-output.js";
import { ensureCompletionState, normalizeSubagentRunState } from "./subagent-delivery-state.js";
import {
SUBAGENT_ENDED_OUTCOME_KILLED,
SUBAGENT_ENDED_REASON_COMPLETE,
@@ -74,9 +76,10 @@ export function markSubagentRunPausedAfterYield(params: {
entry.cleanupHandled = false;
mutated = true;
}
if (entry.frozenResultText !== undefined) {
entry.frozenResultText = undefined;
entry.frozenResultCapturedAt = undefined;
const completion = ensureCompletionState(entry);
if (completion.resultText !== undefined) {
completion.resultText = undefined;
completion.capturedAt = undefined;
mutated = true;
}
return mutated;
@@ -368,6 +371,7 @@ export function createSubagentRunManager(params: {
fallback?: SubagentRunRecord;
runTimeoutSeconds?: number;
preserveFrozenResultFallback?: boolean;
transcriptFile?: string;
}) => {
const previousRunId = replaceParams.previousRunId.trim();
const nextRunId = replaceParams.nextRunId.trim();
@@ -386,6 +390,12 @@ export function createSubagentRunManager(params: {
if (shouldDeleteAttachments(source)) {
void safeRemoveAttachmentsDir(source);
}
if (
source.execution?.transcriptFile &&
source.execution.transcriptFile !== replaceParams.transcriptFile
) {
void removeInternalSessionEffectsTranscript(source.execution.transcriptFile);
}
params.runs.delete(previousRunId);
params.resumedRuns.delete(previousRunId);
}
@@ -410,7 +420,8 @@ export function createSubagentRunManager(params: {
typeof source.endedAt === "number" ? source.endedAt : now,
) ?? 0;
const next: SubagentRunRecord = {
const sourceCompletion = ensureCompletionState(source);
const next: SubagentRunRecord = normalizeSubagentRunState({
...source,
runId: nextRunId,
createdAt: now,
@@ -423,37 +434,26 @@ export function createSubagentRunManager(params: {
endedHookEmittedAt: undefined,
wakeOnDescendantSettle: undefined,
outcome: undefined,
frozenResultText: undefined,
frozenResultCapturedAt: undefined,
fallbackFrozenResultText: preserveFrozenResultFallback ? source.frozenResultText : undefined,
fallbackFrozenResultCapturedAt: preserveFrozenResultFallback
? source.frozenResultCapturedAt
: undefined,
execution: {
status: "running",
startedAt: now,
transcriptFile: replaceParams.transcriptFile,
},
completion: {
required: source.expectsCompletionMessage === true,
fallbackResultText: preserveFrozenResultFallback ? sourceCompletion.resultText : undefined,
fallbackCapturedAt: preserveFrozenResultFallback ? sourceCompletion.capturedAt : undefined,
},
cleanupCompletedAt: undefined,
cleanupHandled: false,
completionEnqueuedAt: undefined,
completionDeliveredAt: undefined,
completionAnnouncedAt: undefined,
lastAnnounceDropReason: undefined,
suppressAnnounceReason: undefined,
announceRetryCount: undefined,
lastAnnounceRetryAt: undefined,
lastAnnounceDeliveryError: undefined,
pendingFinalDelivery: undefined,
pendingFinalDeliveryCreatedAt: undefined,
pendingFinalDeliveryLastAttemptAt: undefined,
pendingFinalDeliveryAttemptCount: undefined,
pendingFinalDeliveryLastError: undefined,
pendingFinalDeliveryPayload: undefined,
deliverySuspendedAt: undefined,
deliverySuspendedReason: undefined,
deliveryDiscardedAt: undefined,
deliveryDiscardReason: undefined,
deliveryDiscardedPayloadSummary: undefined,
delivery: {
status: source.expectsCompletionMessage === false ? "not_required" : "pending",
},
spawnMode,
archiveAtMs,
runTimeoutSeconds,
};
});
params.runs.set(nextRunId, next);
params.ensureListener();
@@ -485,7 +485,7 @@ export function createSubagentRunManager(params: {
const runTimeoutSeconds = registerParams.runTimeoutSeconds ?? 0;
const waitTimeoutMs = params.resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
const requesterOrigin = normalizeDeliveryContext(registerParams.requesterOrigin);
const entry: SubagentRunRecord = {
const entry: SubagentRunRecord = normalizeSubagentRunState({
runId,
childSessionKey,
controllerSessionKey,
@@ -504,16 +504,25 @@ export function createSubagentRunManager(params: {
runTimeoutSeconds,
createdAt: now,
startedAt: now,
execution: {
status: "running",
startedAt: now,
},
completion: {
required: registerParams.expectsCompletionMessage === true,
},
delivery: {
status: registerParams.expectsCompletionMessage === false ? "not_required" : "pending",
},
sessionStartedAt: now,
accumulatedRuntimeMs: 0,
archiveAtMs,
cleanupHandled: false,
completionAnnouncedAt: undefined,
wakeOnDescendantSettle: undefined,
attachmentsDir: registerParams.attachmentsDir,
attachmentsRootDir: registerParams.attachmentsRootDir,
retainAttachmentsOnKeep: registerParams.retainAttachmentsOnKeep,
};
});
params.runs.set(runId, entry);
try {
params.persistOrThrow();

View File

@@ -6,6 +6,7 @@ type ReplaceSubagentRunAfterSteerParams = {
fallback?: SubagentRunRecord;
runTimeoutSeconds?: number;
preserveFrozenResultFallback?: boolean;
transcriptFile?: string;
};
type ReplaceSubagentRunAfterSteerFn = (params: ReplaceSubagentRunAfterSteerParams) => boolean;

View File

@@ -150,14 +150,13 @@ describe("announce loop guard (#18264)", () => {
createdAt: now - 60_000,
startedAt: now - 55_000,
endedAt: now - 50_000,
announceRetryCount: 3,
lastAnnounceRetryAt: now - 10_000,
delivery: { status: "pending", attemptCount: 3, lastAttemptAt: now - 10_000 },
});
const runs = registry.listSubagentRunsForRequester("agent:main:main");
const entry = requireRunById(runs, "test-loop-guard");
expect(entry.announceRetryCount).toBe(3);
expect(entry.lastAnnounceRetryAt).toBe(now - 10_000);
expect(entry.delivery?.attemptCount).toBe(3);
expect(entry.delivery?.lastAttemptAt).toBe(now - 10_000);
});
test.each([
@@ -175,8 +174,7 @@ describe("announce loop guard (#18264)", () => {
startedAt: now - 14 * 60_000,
endedAt: now - 10 * 60_000,
cleanupCompletedAt: undefined,
announceRetryCount: 3,
lastAnnounceRetryAt: now - 9 * 60_000,
delivery: { status: "pending" as const, attemptCount: 3, lastAttemptAt: now - 9 * 60_000 },
}),
},
{
@@ -192,8 +190,7 @@ describe("announce loop guard (#18264)", () => {
startedAt: now - 90_000,
endedAt: now - 60_000,
cleanupCompletedAt: undefined,
announceRetryCount: 3,
lastAnnounceRetryAt: now - 30_000,
delivery: { status: "pending" as const, attemptCount: 3, lastAttemptAt: now - 30_000 },
}),
},
])("$name", async ({ createEntry }) => {
@@ -278,9 +275,9 @@ describe("announce loop guard (#18264)", () => {
const stored = await waitForRun(
runId,
(run) => run.cleanupHandled === false && run.announceRetryCount === 1,
(run) => run.cleanupHandled === false && run.delivery?.attemptCount === 1,
);
expect(stored.cleanupCompletedAt).toBeUndefined();
expect(stored.lastAnnounceRetryAt).toBeTypeOf("number");
expect(stored.delivery?.lastAttemptAt).toBeTypeOf("number");
});
});

View File

@@ -249,7 +249,7 @@ describe("subagent registry lifecycle error grace", () => {
const run = mod
.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY)
.find((candidate) => candidate.runId === runId);
if (run?.frozenResultText === expectedText) {
if (run?.completion?.resultText === expectedText) {
return run;
}
await vi.advanceTimersByTimeAsync(1);
@@ -432,7 +432,7 @@ describe("subagent registry lifecycle error grace", () => {
const runBeforeRefresh = mod
.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY)
.find((candidate) => candidate.runId === "run-refresh");
const firstCapturedAt = runBeforeRefresh?.frozenResultCapturedAt ?? 0;
const firstCapturedAt = runBeforeRefresh?.completion?.capturedAt ?? 0;
setAssistantOutput(
"agent:main:subagent:refresh",
@@ -447,10 +447,10 @@ describe("subagent registry lifecycle error grace", () => {
"run-refresh",
"All 3 subagents complete. Here's the final summary.",
);
expect(runAfterRefresh?.frozenResultText).toBe(
expect(runAfterRefresh?.completion?.resultText).toBe(
"All 3 subagents complete. Here's the final summary.",
);
expect((runAfterRefresh?.frozenResultCapturedAt ?? 0) >= firstCapturedAt).toBe(true);
expect((runAfterRefresh?.completion?.capturedAt ?? 0) >= firstCapturedAt).toBe(true);
emitLifecycleEvent("run-refresh", { phase: "end", endedAt: endedAt + 300 });
await flushAsync();
@@ -484,7 +484,7 @@ describe("subagent registry lifecycle error grace", () => {
const runAfterSilent = mod
.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY)
.find((candidate) => candidate.runId === "run-refresh-silent");
expect(runAfterSilent?.frozenResultText).toBe("All work complete, final summary");
expect(runAfterSilent?.completion?.resultText).toBe("All work complete, final summary");
emitLifecycleEvent("run-refresh-silent", { phase: "end", endedAt: endedAt + 300 });
await flushAsync();
@@ -516,9 +516,11 @@ describe("subagent registry lifecycle error grace", () => {
throw new Error("expected capped run to exist");
}
expect(run.runId).toBe("run-capped");
expect(typeof run.frozenResultText).toBe("string");
expect(run.frozenResultText).toContain("[truncated: frozen completion output exceeded 100KB");
expect(run.frozenResultCapturedAt).toBeTypeOf("number");
expect(typeof run.completion?.resultText).toBe("string");
expect(run.completion?.resultText).toContain(
"[truncated: frozen completion output exceeded 100KB",
);
expect(run.completion?.capturedAt).toBeTypeOf("number");
});
it("keeps parallel child completion results frozen even when late traffic arrives", async () => {

View File

@@ -67,6 +67,7 @@ vi.mock("../config/sessions.js", () => {
const announceSpy = vi.fn(async (_params: unknown) => true);
const runSubagentEndedHookMock = vi.fn(async (eventValue?: unknown, _ctx?: unknown) => {});
const emitSessionLifecycleEventMock = vi.fn();
const removeInternalSessionEffectsTranscriptMock = vi.fn(async (_sessionFile?: string) => {});
function countMatching<T>(items: readonly T[], predicate: (item: T) => boolean) {
let count = 0;
@@ -153,6 +154,10 @@ vi.mock("./subagent-registry.store.js", () => ({
saveSubagentRegistryToDisk: vi.fn(() => {}),
}));
vi.mock("./internal-session-effects.js", () => ({
removeInternalSessionEffectsTranscript: removeInternalSessionEffectsTranscriptMock,
}));
describe("subagent registry steer restarts", () => {
let mod: typeof import("./subagent-registry.js");
type RegisterSubagentRunInput = Parameters<typeof mod.registerSubagentRun>[0];
@@ -176,6 +181,7 @@ describe("subagent registry steer restarts", () => {
runSubagentEndedHookMock.mockReset();
runSubagentEndedHookMock.mockImplementation(async () => {});
emitSessionLifecycleEventMock.mockReset();
removeInternalSessionEffectsTranscriptMock.mockClear();
mod.resetSubagentRegistryForTests({ persist: false });
});
@@ -271,11 +277,13 @@ describe("subagent registry steer restarts", () => {
previousRunId: string;
nextRunId: string;
fallback?: ReturnType<typeof listMainRuns>[number];
transcriptFile?: string;
}) => {
const replaced = mod.replaceSubagentRunAfterSteer({
previousRunId: params.previousRunId,
nextRunId: params.nextRunId,
fallback: params.fallback,
transcriptFile: params.transcriptFile,
});
expect(replaced).toBe(true);
@@ -294,6 +302,7 @@ describe("subagent registry steer restarts", () => {
runSubagentEndedHookMock.mockImplementation(async () => {});
emitSessionLifecycleEventMock.mockReset();
lifecycleHandler = undefined;
removeInternalSessionEffectsTranscriptMock.mockClear();
mod.resetSubagentRegistryForTests({ persist: false });
});
@@ -345,6 +354,37 @@ describe("subagent registry steer restarts", () => {
}
});
it("removes orphaned private transcript when steer replaces an internally resumed run", async () => {
{
registerRun({
runId: "run-old",
childSessionKey: "agent:main:subagent:steer",
task: "initial task",
});
const previous = listMainRuns()[0];
expect(previous?.runId).toBe("run-old");
if (!previous) {
throw new Error("expected registered subagent run");
}
previous.execution = {
status: "interrupted",
startedAt: previous.startedAt,
transcriptFile: "/tmp/openclaw-state/internal-agent-runs/run-old.jsonl",
};
replaceRunAfterSteer({
previousRunId: "run-old",
nextRunId: "run-new",
fallback: previous,
});
expect(removeInternalSessionEffectsTranscriptMock).toHaveBeenCalledWith(
"/tmp/openclaw-state/internal-agent-runs/run-old.jsonl",
);
}
});
it("defers subagent_ended hook for completion-mode runs until announce delivery resolves", async () => {
{
const resolveAnnounce = createDeferredAnnounceResolver();
@@ -411,8 +451,7 @@ describe("subagent registry steer restarts", () => {
const previous = listMainRuns()[0];
expect(previous?.runId).toBe("run-retry-reset-old");
if (previous) {
previous.announceRetryCount = 2;
previous.lastAnnounceRetryAt = Date.now();
previous.delivery = { status: "pending", attemptCount: 2, lastAttemptAt: Date.now() };
}
const run = replaceRunAfterSteer({
@@ -420,8 +459,8 @@ describe("subagent registry steer restarts", () => {
nextRunId: "run-retry-reset-new",
fallback: previous,
});
expect(run.announceRetryCount).toBeUndefined();
expect(run.lastAnnounceRetryAt).toBeUndefined();
expect(run.delivery?.attemptCount).toBeUndefined();
expect(run.delivery?.lastAttemptAt).toBeUndefined();
}
});
@@ -473,8 +512,11 @@ describe("subagent registry steer restarts", () => {
const previous = listMainRuns()[0];
expect(previous?.runId).toBe("run-frozen-old");
if (previous) {
previous.frozenResultText = "stale frozen completion";
previous.frozenResultCapturedAt = Date.now();
previous.completion = {
required: true,
resultText: "stale frozen completion",
capturedAt: Date.now(),
};
previous.cleanupCompletedAt = Date.now();
previous.cleanupHandled = true;
}
@@ -485,8 +527,8 @@ describe("subagent registry steer restarts", () => {
fallback: previous,
});
expect(run.frozenResultText).toBeUndefined();
expect(run.frozenResultCapturedAt).toBeUndefined();
expect(run.completion?.resultText).toBeUndefined();
expect(run.completion?.capturedAt).toBeUndefined();
expect(run.cleanupCompletedAt).toBeUndefined();
expect(run.cleanupHandled).toBe(false);
});
@@ -543,10 +585,13 @@ describe("subagent registry steer restarts", () => {
if (!previous) {
throw new Error("missing previous run");
}
previous.completionEnqueuedAt = 1_000;
previous.completionDeliveredAt = 2_000;
previous.completionAnnouncedAt = 2_000;
previous.lastAnnounceDropReason = "sink_unavailable";
previous.delivery = {
status: "delivered",
enqueuedAt: 1_000,
deliveredAt: 2_000,
announcedAt: 2_000,
lastDropReason: "sink_unavailable",
};
const replaced = mod.replaceSubagentRunAfterSteer({
previousRunId: "run-delivery-old",
@@ -559,10 +604,10 @@ describe("subagent registry steer restarts", () => {
if (!next) {
throw new Error("expected replacement run");
}
expect(next.completionEnqueuedAt).toBeUndefined();
expect(next.completionDeliveredAt).toBeUndefined();
expect(next.completionAnnouncedAt).toBeUndefined();
expect(next.lastAnnounceDropReason).toBeUndefined();
expect(next.delivery?.enqueuedAt).toBeUndefined();
expect(next.delivery?.deliveredAt).toBeUndefined();
expect(next.delivery?.announcedAt).toBeUndefined();
expect(next.delivery?.lastDropReason).toBeUndefined();
});
it("preserves frozen completion as fallback when replacing for wake continuation", () => {
@@ -575,8 +620,11 @@ describe("subagent registry steer restarts", () => {
const previous = listMainRuns()[0];
expect(previous?.runId).toBe("run-wake-old");
if (previous) {
previous.frozenResultText = "final summary before wake";
previous.frozenResultCapturedAt = 1234;
previous.completion = {
required: true,
resultText: "final summary before wake",
capturedAt: 1234,
};
}
const replaced = mod.replaceSubagentRunAfterSteer({
@@ -591,9 +639,9 @@ describe("subagent registry steer restarts", () => {
if (!run) {
throw new Error("expected wake replacement run");
}
expect(run.frozenResultText).toBeUndefined();
expect(run.fallbackFrozenResultText).toBe("final summary before wake");
expect(run.fallbackFrozenResultCapturedAt).toBe(1234);
expect(run.completion?.resultText).toBeUndefined();
expect(run.completion?.fallbackResultText).toBe("final summary before wake");
expect(run.completion?.fallbackCapturedAt).toBe(1234);
});
it("restores announce for a finished run when steer replacement dispatch fails", async () => {
@@ -786,27 +834,27 @@ describe("subagent registry steer restarts", () => {
await vi.advanceTimersByTimeAsync(0);
expect(announceSpy).toHaveBeenCalledTimes(1);
expect(listMainRuns()[0]?.announceRetryCount).toBe(1);
expect(listMainRuns()[0]?.delivery?.attemptCount).toBe(1);
await vi.advanceTimersByTimeAsync(999);
expect(announceSpy).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1);
expect(announceSpy).toHaveBeenCalledTimes(2);
expect(listMainRuns()[0]?.announceRetryCount).toBe(2);
expect(listMainRuns()[0]?.delivery?.attemptCount).toBe(2);
await vi.advanceTimersByTimeAsync(1_999);
expect(announceSpy).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(1);
expect(announceSpy).toHaveBeenCalledTimes(3);
expect(listMainRuns()[0]?.announceRetryCount).toBe(3);
expect(listMainRuns()[0]?.delivery?.attemptCount).toBe(3);
await vi.advanceTimersByTimeAsync(4_001);
expect(announceSpy).toHaveBeenCalledTimes(3);
await waitForRegistrySideEffect(() => {
const run = listMainRuns()[0];
expect(run?.pendingFinalDelivery).toBe(true);
expect(run?.deliverySuspendedAt).toBeTypeOf("number");
expect(run?.deliverySuspendedReason).toBe("retry-limit");
expect(run?.delivery?.status).toBe("suspended");
expect(run?.delivery?.suspendedAt).toBeTypeOf("number");
expect(run?.delivery?.suspendedReason).toBe("retry-limit");
expect(run?.cleanupCompletedAt).toBeUndefined();
});
} finally {

View File

@@ -5,6 +5,7 @@ import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import { readStringValue } from "../shared/string-coerce.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import { normalizeSubagentRunState } from "./subagent-delivery-state.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
type PersistedSubagentRegistryV1 = {
@@ -148,16 +149,19 @@ export function loadSubagentRegistryFromDisk(): Map<string, SubagentRunRecord> {
requesterAccountId: _accountId,
...rest
} = typed;
out.set(runId, {
...rest,
childSessionKey,
requesterSessionKey,
controllerSessionKey,
requesterOrigin,
cleanupCompletedAt,
cleanupHandled,
spawnMode: typed.spawnMode === "session" ? "session" : "run",
});
out.set(
runId,
normalizeSubagentRunState({
...rest,
childSessionKey,
requesterSessionKey,
controllerSessionKey,
requesterOrigin,
cleanupCompletedAt,
cleanupHandled,
spawnMode: typed.spawnMode === "session" ? "session" : "run",
}),
);
if (isLegacy) {
migrated = true;
}
@@ -178,7 +182,7 @@ export function saveSubagentRegistryToDisk(runs: Map<string, SubagentRunRecord>)
const pathname = resolveSubagentRegistryPath();
const serialized: Record<string, PersistedSubagentRunRecord> = {};
for (const [runId, entry] of runs.entries()) {
serialized[runId] = entry;
serialized[runId] = normalizeSubagentRunState(cloneSubagentRunRecord(entry));
}
const out: PersistedSubagentRegistry = {
version: REGISTRY_VERSION,

View File

@@ -252,8 +252,8 @@ describe("subagent registry seam flow", () => {
expectsCompletionMessage: true,
createdAt: now - 2,
endedAt: now - 1,
pendingFinalDelivery: true,
frozenResultText: "child output",
completion: { required: true, resultText: "child output" },
delivery: { status: "pending" },
});
mod.addSubagentRunForTests({
runId: "run-complete",
@@ -265,7 +265,7 @@ describe("subagent registry seam flow", () => {
expectsCompletionMessage: true,
createdAt: now - 4,
endedAt: now - 3,
completionAnnouncedAt: now - 2,
delivery: { status: "delivered", announcedAt: now - 2, deliveredAt: now - 2 },
cleanupCompletedAt: now - 1,
});
@@ -1098,8 +1098,11 @@ describe("subagent registry seam flow", () => {
startedAt: Date.parse("2026-03-24T11:59:00Z"),
endedAt: Date.parse("2026-03-24T11:59:30Z"),
expectsCompletionMessage: true,
announceRetryCount: 3,
lastAnnounceRetryAt: Date.parse("2026-03-24T11:59:40Z"),
delivery: {
status: "pending",
attemptCount: 3,
lastAttemptAt: Date.parse("2026-03-24T11:59:40Z"),
},
});
return 1;
}) as never);
@@ -1144,21 +1147,23 @@ describe("subagent registry seam flow", () => {
endedReason: "subagent-complete",
expectsCompletionMessage: true,
outcome: { status: "ok" },
announceRetryCount: 3,
lastAnnounceRetryAt: Date.parse("2026-03-24T11:59:40Z"),
lastAnnounceDeliveryError: "gateway request timeout for agent",
frozenResultText: "child completed successfully",
pendingFinalDelivery: true,
pendingFinalDeliveryPayload: {
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
childSessionKey: "agent:main:subagent:child",
childRunId: "run-resume-keep",
task: "resume keep retry budget",
endedAt: Date.parse("2026-03-24T11:59:30Z"),
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "child completed successfully",
completion: { required: true, resultText: "child completed successfully" },
delivery: {
status: "pending",
attemptCount: 3,
lastAttemptAt: Date.parse("2026-03-24T11:59:40Z"),
lastError: "gateway request timeout for agent",
payload: {
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
childSessionKey: "agent:main:subagent:child",
childRunId: "run-resume-keep",
task: "resume keep retry budget",
endedAt: Date.parse("2026-03-24T11:59:30Z"),
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "child completed successfully",
},
},
});
return 1;
@@ -1173,12 +1178,14 @@ describe("subagent registry seam flow", () => {
.listSubagentRunsForRequester("agent:main:main")
.find((entry) => entry.runId === "run-resume-keep");
expect(run).toMatchObject({
pendingFinalDelivery: true,
deliverySuspendedReason: "retry-limit",
delivery: {
status: "suspended",
suspendedReason: "retry-limit",
},
cleanupHandled: false,
});
expect(run?.cleanupCompletedAt).toBeUndefined();
expect(run?.pendingFinalDeliveryPayload).toMatchObject({
expect(run?.delivery?.payload).toMatchObject({
childRunId: "run-resume-keep",
frozenResultText: "child completed successfully",
});
@@ -1199,27 +1206,26 @@ describe("subagent registry seam flow", () => {
endedAt,
endedReason: "subagent-complete",
outcome: { status: "ok" },
announceRetryCount: 3,
lastAnnounceRetryAt: endedAt + 1_000,
lastAnnounceDeliveryError: "gateway request timeout for agent",
pendingFinalDelivery: true,
pendingFinalDeliveryCreatedAt: endedAt + 1_000,
pendingFinalDeliveryLastAttemptAt: endedAt + 2_000,
pendingFinalDeliveryAttemptCount: 3,
pendingFinalDeliveryLastError: "gateway request timeout for agent",
pendingFinalDeliveryPayload: {
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
childSessionKey: "agent:main:subagent:reactivated",
childRunId: "run-suspended-old",
task: "reactivate suspended delivery",
endedAt,
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "child completed successfully",
delivery: {
status: "suspended",
createdAt: endedAt + 1_000,
lastAttemptAt: endedAt + 2_000,
attemptCount: 3,
lastError: "gateway request timeout for agent",
payload: {
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
childSessionKey: "agent:main:subagent:reactivated",
childRunId: "run-suspended-old",
task: "reactivate suspended delivery",
endedAt,
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "child completed successfully",
},
suspendedAt: endedAt + 3_000,
suspendedReason: "retry-limit",
},
deliverySuspendedAt: endedAt + 3_000,
deliverySuspendedReason: "retry-limit",
});
expect(
@@ -1238,11 +1244,10 @@ describe("subagent registry seam flow", () => {
cleanupHandled: false,
});
expect(replacement?.endedAt).toBeUndefined();
expect(replacement?.lastAnnounceDeliveryError).toBeUndefined();
expect(replacement?.pendingFinalDelivery).toBeUndefined();
expect(replacement?.pendingFinalDeliveryPayload).toBeUndefined();
expect(replacement?.deliverySuspendedAt).toBeUndefined();
expect(replacement?.deliverySuspendedReason).toBeUndefined();
expect(replacement?.delivery?.lastError).toBeUndefined();
expect(replacement?.delivery?.payload).toBeUndefined();
expect(replacement?.delivery?.suspendedAt).toBeUndefined();
expect(replacement?.delivery?.suspendedReason).toBeUndefined();
});
it("finalizes expired delete-mode parents when descendant cleanup retriggers deferred announce handling", async () => {
@@ -1699,25 +1704,26 @@ describe("subagent registry seam flow", () => {
startedAt: now - 3 * 60 * 60_000,
endedAt: now - 3 * 60 * 60_000,
outcome: { status: "ok" },
pendingFinalDelivery: true,
pendingFinalDeliveryCreatedAt: now - 3 * 60 * 60_000,
pendingFinalDeliveryLastAttemptAt: now - 2 * 60 * 60_000 - 1,
pendingFinalDeliveryAttemptCount: 3,
pendingFinalDeliveryLastError: "gateway request timeout for agent",
pendingFinalDeliveryPayload: {
requesterSessionKey: "agent:main:cron:cron-1:run:parent",
requesterDisplayKey: "cron",
childSessionKey: "agent:main:subagent:suspended-cron",
childRunId: runId,
task: "cron suspended delivery",
endedAt: now - 3 * 60 * 60_000,
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "large final payload",
delivery: {
status: "suspended",
createdAt: now - 3 * 60 * 60_000,
lastAttemptAt: now - 2 * 60 * 60_000 - 1,
attemptCount: 3,
lastError: "gateway request timeout for agent",
payload: {
requesterSessionKey: "agent:main:cron:cron-1:run:parent",
requesterDisplayKey: "cron",
childSessionKey: "agent:main:subagent:suspended-cron",
childRunId: runId,
task: "cron suspended delivery",
endedAt: now - 3 * 60 * 60_000,
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "large final payload",
},
suspendedAt: now - 2 * 60 * 60_000 - 1,
suspendedReason: "retry-limit",
},
deliverySuspendedAt: now - 2 * 60 * 60_000 - 1,
deliverySuspendedReason: "retry-limit",
lastAnnounceDeliveryError: "gateway request timeout for agent",
});
await mod.testing.sweepOnceForTests();
@@ -1725,16 +1731,18 @@ describe("subagent registry seam flow", () => {
const run = mod.getSubagentRunByChildSessionKey("agent:main:subagent:suspended-cron");
expect(run).toMatchObject({
runId,
pendingFinalDelivery: undefined,
pendingFinalDeliveryPayload: undefined,
deliverySuspendedAt: undefined,
deliverySuspendedReason: undefined,
deliveryDiscardedAt: now,
deliveryDiscardReason: "expired",
delivery: {
status: "discarded",
payload: undefined,
suspendedAt: undefined,
suspendedReason: undefined,
discardedAt: now,
discardReason: "expired",
},
cleanupHandled: true,
cleanupCompletedAt: now,
});
expect(run?.deliveryDiscardedPayloadSummary).toEqual({
expect(run?.delivery?.discardedPayloadSummary).toEqual({
requesterSessionKey: "agent:main:cron:cron-1:run:parent",
childSessionKey: "agent:main:subagent:suspended-cron",
childRunId: runId,
@@ -1770,24 +1778,26 @@ describe("subagent registry seam flow", () => {
startedAt: now - 60_000,
endedAt: now - 60_000,
outcome: { status: "ok" },
pendingFinalDelivery: true,
pendingFinalDeliveryCreatedAt: now - 60_000,
pendingFinalDeliveryLastAttemptAt: now - 60_000 + i,
pendingFinalDeliveryAttemptCount: 3,
pendingFinalDeliveryLastError: "gateway request timeout for agent",
pendingFinalDeliveryPayload: {
requesterSessionKey: "agent:main:telegram:direct:418181497",
requesterDisplayKey: "telegram",
childSessionKey: `agent:main:subagent:suspended-pressure-${i}`,
childRunId: runId,
task: "interactive suspended delivery",
endedAt: now - 60_000,
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "final payload",
delivery: {
status: "suspended",
createdAt: now - 60_000,
lastAttemptAt: now - 60_000 + i,
attemptCount: 3,
lastError: "gateway request timeout for agent",
payload: {
requesterSessionKey: "agent:main:telegram:direct:418181497",
requesterDisplayKey: "telegram",
childSessionKey: `agent:main:subagent:suspended-pressure-${i}`,
childRunId: runId,
task: "interactive suspended delivery",
endedAt: now - 60_000,
outcome: { status: "ok" },
expectsCompletionMessage: true,
frozenResultText: "final payload",
},
suspendedAt: now - 60_000 + i,
suspendedReason: "retry-limit",
},
deliverySuspendedAt: now - 60_000 + i,
deliverySuspendedReason: "retry-limit",
});
}
@@ -1796,15 +1806,16 @@ describe("subagent registry seam flow", () => {
const runs = Array.from({ length: 51 }, (_, i) =>
mod.getSubagentRunByChildSessionKey(`agent:main:subagent:suspended-pressure-${i}`),
);
const discarded = runs.filter((run) => run?.deliveryDiscardReason === "pressure-pruned");
const discarded = runs.filter((run) => run?.delivery?.discardReason === "pressure-pruned");
const stillSuspended = runs.filter(
(run) => run?.pendingFinalDelivery === true && typeof run.deliverySuspendedAt === "number",
(run) =>
run?.delivery?.status === "suspended" && typeof run.delivery.suspendedAt === "number",
);
expect(discarded).toHaveLength(41);
expect(stillSuspended).toHaveLength(10);
expect(discarded[0]?.runId).toBe("run-suspended-pressure-0");
expect(runs[40]?.deliveryDiscardReason).toBe("pressure-pruned");
expect(runs[41]?.pendingFinalDelivery).toBe(true);
expect(runs[40]?.delivery?.discardReason).toBe("pressure-pruned");
expect(runs[41]?.delivery?.status).toBe("suspended");
expect(mocks.persistSubagentRunsToDisk).toHaveBeenCalled();
});
});

View File

@@ -11,7 +11,16 @@ import { createLazyImportLoader, createLazyPromiseLoader } from "../shared/lazy-
import { importRuntimeModule } from "../shared/runtime-import.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import { removeInternalSessionEffectsTranscript } from "./internal-session-effects.js";
import type { ensureRuntimePluginsLoaded as ensureRuntimePluginsLoadedFn } from "./runtime-plugins.js";
import {
ensureCompletionState,
ensureDeliveryState,
getDeliveryAttemptCount,
getDeliveryLastAttemptAt,
getDeliveryLastError,
isDeliverySuspended,
} from "./subagent-delivery-state.js";
import {
SUBAGENT_ENDED_REASON_COMPLETE,
SUBAGENT_ENDED_REASON_ERROR,
@@ -517,18 +526,14 @@ function resumeSubagentRun(runId: string) {
if (entry.cleanupCompletedAt) {
return;
}
if (
typeof entry.endedAt === "number" &&
entry.pendingFinalDelivery === true &&
typeof entry.deliverySuspendedAt === "number"
) {
if (typeof entry.endedAt === "number" && isDeliverySuspended(entry)) {
return;
}
if (entry.pauseReason === "sessions_yield") {
return;
}
// Skip entries that have exhausted their retry budget or expired (#18264).
if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) {
if (getDeliveryAttemptCount(entry) >= MAX_ANNOUNCE_RETRY_COUNT) {
void finalizeResumedAnnounceGiveUp({
runId,
entry,
@@ -550,13 +555,10 @@ function resumeSubagentRun(runId: string) {
}
const now = Date.now();
const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0);
const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs;
if (
entry.expectsCompletionMessage === true &&
entry.lastAnnounceRetryAt &&
now < earliestRetryAt
) {
const lastAttemptAt = getDeliveryLastAttemptAt(entry);
const delayMs = resolveAnnounceRetryDelayMs(getDeliveryAttemptCount(entry));
const earliestRetryAt = (lastAttemptAt ?? 0) + delayMs;
if (entry.expectsCompletionMessage === true && lastAttemptAt && now < earliestRetryAt) {
const waitMs = Math.max(1, earliestRetryAt - now);
const scheduledEntry = entry;
const timer = setTimeout(() => {
@@ -677,11 +679,7 @@ function stopSweeper() {
}
function isSuspendedPendingFinalDelivery(entry: SubagentRunRecord): boolean {
return (
typeof entry.endedAt === "number" &&
entry.pendingFinalDelivery === true &&
typeof entry.deliverySuspendedAt === "number"
);
return typeof entry.endedAt === "number" && isDeliverySuspended(entry);
}
function resolveSuspendedDeliveryExpiryMs(entry: SubagentRunRecord): number {
@@ -701,30 +699,32 @@ async function discardSuspendedPendingFinalDelivery(
now: number,
reason: "expired" | "pressure-pruned",
): Promise<void> {
const payload = entry.pendingFinalDeliveryPayload;
entry.deliveryDiscardedAt = now;
entry.deliveryDiscardReason = reason;
entry.deliveryDiscardedPayloadSummary = {
const delivery = ensureDeliveryState(entry);
const payload = delivery.payload;
delivery.status = "discarded";
delivery.discardedAt = now;
delivery.discardReason = reason;
delivery.discardedPayloadSummary = {
requesterSessionKey: payload?.requesterSessionKey ?? entry.requesterSessionKey,
childSessionKey: payload?.childSessionKey ?? entry.childSessionKey,
childRunId: payload?.childRunId ?? entry.runId,
endedAt: payload?.endedAt ?? entry.endedAt,
status: payload?.outcome?.status ?? entry.outcome?.status,
lastError: entry.lastAnnounceDeliveryError ?? entry.pendingFinalDeliveryLastError ?? null,
lastError: getDeliveryLastError(entry) ?? null,
};
entry.pendingFinalDelivery = undefined;
entry.pendingFinalDeliveryCreatedAt = undefined;
entry.pendingFinalDeliveryLastAttemptAt = undefined;
entry.pendingFinalDeliveryAttemptCount = undefined;
entry.pendingFinalDeliveryLastError = undefined;
entry.pendingFinalDeliveryPayload = undefined;
entry.deliverySuspendedAt = undefined;
entry.deliverySuspendedReason = undefined;
delivery.payload = undefined;
delivery.createdAt = undefined;
delivery.lastAttemptAt = undefined;
delivery.attemptCount = undefined;
delivery.lastError = undefined;
delivery.suspendedAt = undefined;
delivery.suspendedReason = undefined;
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
entry.fallbackFrozenResultCapturedAt = undefined;
const completion = ensureCompletionState(entry);
completion.fallbackResultText = undefined;
completion.fallbackCapturedAt = undefined;
entry.cleanupHandled = true;
entry.completionAnnouncedAt = undefined;
delivery.announcedAt = undefined;
resumedRuns.delete(runId);
clearPendingLifecycleError(runId);
clearPendingLifecycleTimeout(runId);
@@ -738,6 +738,7 @@ async function discardSuspendedPendingFinalDelivery(
if (shouldDeleteAttachments) {
await safeRemoveAttachmentsDir(entry);
}
await removeInternalSessionEffectsTranscript(entry.execution?.transcriptFile);
const completionReason = entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE;
completeCleanupBookkeeping({
runId,
@@ -779,7 +780,7 @@ async function sweepSubagentRuns() {
suspendedEntries.length - SUSPENDED_DELIVERY_PRESSURE_TARGET,
);
for (const [runId] of suspendedEntries
.toSorted((a, b) => (a[1].deliverySuspendedAt ?? 0) - (b[1].deliverySuspendedAt ?? 0))
.toSorted((a, b) => (a[1].delivery?.suspendedAt ?? 0) - (b[1].delivery?.suspendedAt ?? 0))
.slice(0, pressureCount)) {
pressureDiscardRunIds.add(runId);
}
@@ -793,7 +794,7 @@ async function sweepSubagentRuns() {
}
for (const [runId, entry] of subagentRuns.entries()) {
if (isSuspendedPendingFinalDelivery(entry)) {
const suspendedAgeMs = now - (entry.deliverySuspendedAt ?? now);
const suspendedAgeMs = now - (entry.delivery?.suspendedAt ?? now);
const expired = suspendedAgeMs >= resolveSuspendedDeliveryExpiryMs(entry);
if (expired || pressureDiscardRunIds.has(runId)) {
await discardSuspendedPendingFinalDelivery(
@@ -1093,6 +1094,7 @@ export function replaceSubagentRunAfterSteer(params: {
fallback?: SubagentRunRecord;
runTimeoutSeconds?: number;
preserveFrozenResultFallback?: boolean;
transcriptFile?: string;
}) {
return subagentRunManager.replaceSubagentRunAfterSteer(params);
}

View File

@@ -21,6 +21,56 @@ export type PendingFinalDeliveryPayload = {
wakeOnDescendantSettle?: boolean;
};
export type SubagentExecutionState = {
status: "running" | "interrupted" | "terminal";
startedAt?: number;
endedAt?: number;
outcome?: SubagentRunOutcome;
interruptedAt?: number;
interruptionReason?: "gateway-restart" | "lost-execution-context";
transcriptFile?: string;
};
export type SubagentCompletionState = {
required: boolean;
resultText?: string | null;
capturedAt?: number;
fallbackResultText?: string | null;
fallbackCapturedAt?: number;
};
export type SubagentCompletionDeliveryState = {
status:
| "not_required"
| "pending"
| "in_progress"
| "delivered"
| "failed"
| "suspended"
| "discarded";
payload?: PendingFinalDeliveryPayload;
createdAt?: number;
enqueuedAt?: number;
deliveredAt?: number;
announcedAt?: number;
lastAttemptAt?: number;
attemptCount?: number;
lastError?: string | null;
suspendedAt?: number;
suspendedReason?: "retry-limit" | "expiry";
discardedAt?: number;
discardReason?: "expired" | "pressure-pruned";
discardedPayloadSummary?: {
requesterSessionKey?: string;
childSessionKey?: string;
childRunId?: string;
endedAt?: number;
status?: string;
lastError?: string | null;
};
lastDropReason?: "queue_cap" | "parent_run_ended" | "sink_unavailable" | "dedupe";
};
export type SubagentRunRecord = {
runId: string;
childSessionKey: string;
@@ -48,41 +98,15 @@ export type SubagentRunRecord = {
cleanupHandled?: boolean;
suppressAnnounceReason?: "steer-restart" | "killed";
expectsCompletionMessage?: boolean;
announceRetryCount?: number;
lastAnnounceRetryAt?: number;
lastAnnounceDeliveryError?: string;
endedReason?: SubagentLifecycleEndedReason;
pauseReason?: "sessions_yield";
wakeOnDescendantSettle?: boolean;
frozenResultText?: string | null;
frozenResultCapturedAt?: number;
fallbackFrozenResultText?: string | null;
fallbackFrozenResultCapturedAt?: number;
execution?: SubagentExecutionState;
completion?: SubagentCompletionState;
/** Set after the subagent_ended hook has been emitted successfully once. */
endedHookEmittedAt?: number;
/** Durable marker that final user delivery still needs a retry/resume pass. */
pendingFinalDelivery?: boolean;
pendingFinalDeliveryCreatedAt?: number;
pendingFinalDeliveryLastAttemptAt?: number;
pendingFinalDeliveryAttemptCount?: number;
pendingFinalDeliveryLastError?: string | null;
pendingFinalDeliveryPayload?: PendingFinalDeliveryPayload;
deliverySuspendedAt?: number;
deliverySuspendedReason?: "retry-limit" | "expiry";
deliveryDiscardedAt?: number;
deliveryDiscardReason?: "expired" | "pressure-pruned";
deliveryDiscardedPayloadSummary?: {
requesterSessionKey?: string;
childSessionKey?: string;
childRunId?: string;
endedAt?: number;
status?: string;
lastError?: string | null;
};
completionEnqueuedAt?: number;
completionDeliveredAt?: number;
completionAnnouncedAt?: number;
lastAnnounceDropReason?: "queue_cap" | "parent_run_ended" | "sink_unavailable" | "dedupe";
/** Durable outbox marker for parent/external completion delivery. */
delivery?: SubagentCompletionDeliveryState;
attachmentsDir?: string;
attachmentsRootDir?: string;
retainAttachmentsOnKeep?: boolean;

View File

@@ -45,7 +45,8 @@ function createDescendantRun(params?: {
task?: string;
cleanup?: "keep" | "delete";
endedAt?: number;
frozenResultText?: string | null;
resultText?: string | null;
executionTranscriptFile?: string;
}) {
return {
runId: params?.runId ?? "run-1",
@@ -56,9 +57,17 @@ function createDescendantRun(params?: {
cleanup: params?.cleanup ?? "keep",
createdAt: 1000,
endedAt: params?.endedAt ?? 2000,
...(params?.frozenResultText === undefined
...(params?.resultText === undefined
? {}
: { frozenResultText: params.frozenResultText }),
: { completion: { required: true, resultText: params.resultText } }),
...(params?.executionTranscriptFile
? {
execution: {
status: "terminal" as const,
transcriptFile: params.executionTranscriptFile,
},
}
: {}),
};
}
@@ -127,7 +136,7 @@ describe("readDescendantSubagentFallbackReply", () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([
createDescendantRun({
cleanup: "delete",
frozenResultText: "frozen child output",
resultText: "frozen child output",
}),
]);
vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined);
@@ -140,7 +149,7 @@ describe("readDescendantSubagentFallbackReply", () => {
it("prefers session transcript over frozenResultText", async () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([
createDescendantRun({ frozenResultText: "frozen text" }),
createDescendantRun({ resultText: "frozen text" }),
]);
vi.mocked(readLatestAssistantReply).mockResolvedValue("live transcript text");
const result = await readDescendantSubagentFallbackReply({
@@ -150,15 +159,47 @@ describe("readDescendantSubagentFallbackReply", () => {
expect(result).toBe("live transcript text");
});
it("prefers captured completion for internally resumed descendants", async () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([
createDescendantRun({
resultText: "fresh recovered output",
executionTranscriptFile: "/tmp/openclaw-internal-run.jsonl",
}),
]);
vi.mocked(readLatestAssistantReply).mockResolvedValue("stale visible transcript");
const result = await readDescendantSubagentFallbackReply({
sessionKey: "test-session",
runStartedAt,
});
expect(result).toBe("fresh recovered output");
});
it("does not fall back to visible transcript for internally resumed descendants without captured output", async () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([
createDescendantRun({
resultText: null,
executionTranscriptFile: "/tmp/openclaw-empty-internal-run.jsonl",
}),
]);
vi.mocked(readLatestAssistantReply).mockClear();
vi.mocked(readLatestAssistantReply).mockResolvedValue("stale visible transcript");
const result = await readDescendantSubagentFallbackReply({
sessionKey: "test-session",
runStartedAt,
});
expect(result).toBeUndefined();
expect(readLatestAssistantReply).not.toHaveBeenCalled();
});
it("joins replies from multiple descendants", async () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([
createDescendantRun({ frozenResultText: "first child output" }),
createDescendantRun({ resultText: "first child output" }),
createDescendantRun({
runId: "run-2",
childSessionKey: "child-2",
task: "task-2",
endedAt: 3000,
frozenResultText: "second child output",
resultText: "second child output",
}),
]);
vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined);
@@ -177,7 +218,7 @@ describe("readDescendantSubagentFallbackReply", () => {
childSessionKey: "child-2",
task: "task-2",
endedAt: 3000,
frozenResultText: "useful output",
resultText: "useful output",
}),
]);
vi.mocked(readLatestAssistantReply).mockImplementation(async (params) => {
@@ -193,11 +234,11 @@ describe("readDescendantSubagentFallbackReply", () => {
expect(result).toBe("useful output");
});
it("returns undefined when frozenResultText is null", async () => {
it("returns undefined when completion result is null", async () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([
createDescendantRun({
cleanup: "delete",
frozenResultText: null,
resultText: null,
}),
]);
vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined);
@@ -219,7 +260,7 @@ describe("readDescendantSubagentFallbackReply", () => {
cleanup: "keep",
createdAt: 500,
endedAt: 900,
frozenResultText: "stale output from previous run",
completion: { required: true, resultText: "stale output from previous run" },
},
]);
vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined);

View File

@@ -45,11 +45,21 @@ export async function readDescendantSubagentFallbackReply(params: {
.toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0))
.slice(-4);
for (const entry of latestRuns) {
let reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim();
const frozenResultText = entry.completion?.resultText;
const frozenReply =
typeof frozenResultText === "string" && frozenResultText.trim()
? frozenResultText.trim()
: undefined;
const usesInternalTranscript = typeof entry.execution?.transcriptFile === "string";
let reply = usesInternalTranscript ? frozenReply : undefined;
if (!reply && !usesInternalTranscript) {
reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim();
}
// Fall back to the registry's frozen result text when the session transcript
// is unavailable (e.g. child session already deleted by announce cleanup).
if (!reply && typeof entry.frozenResultText === "string" && entry.frozenResultText.trim()) {
reply = entry.frozenResultText.trim();
// is unavailable (e.g. child session already deleted by announce cleanup) or
// intentionally bypassed by an internal interrupted-resume run.
if (!reply && frozenReply) {
reply = frozenReply;
}
if (!reply || reply.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
continue;

View File

@@ -666,6 +666,28 @@ describe("callGateway url resolution", () => {
expect(lastClientOptions?.clientDisplayName).toBe("gateway:sessions.delete");
});
it("sends internal agent handoffs as backend gateway calls", async () => {
setLocalLoopbackGatewayConfig();
helloMethods = ["agent"];
await callGateway({
method: "agent",
params: {
message: "resume",
sessionEffects: "internal",
suppressPromptPersistence: true,
},
});
expect(lastClientOptions?.clientName).toBe(GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT);
expect(lastClientOptions?.mode).toBe(GATEWAY_CLIENT_MODES.BACKEND);
expect(lastRequestOptions?.method).toBe("agent");
expect(lastRequestOptions?.params).toMatchObject({
sessionEffects: "internal",
suppressPromptPersistence: true,
});
});
it("passes approval runtime tokens to backend gateway clients", async () => {
setLocalLoopbackGatewayConfig();

View File

@@ -194,6 +194,8 @@ export const AgentParamsSchema = Type.Object(
internalRuntimeHandoffId: Type.Optional(NonEmptyString),
internalEvents: Type.Optional(Type.Array(AgentInternalEventSchema)),
inputProvenance: Type.Optional(InputProvenanceSchema),
suppressPromptPersistence: Type.Optional(Type.Boolean()),
sessionEffects: Type.Optional(Type.Union([Type.Literal("visible"), Type.Literal("internal")])),
sourceReplyDeliveryMode: Type.Optional(
Type.Union([Type.Literal("automatic"), Type.Literal("message_tool_only")]),
),

View File

@@ -1639,6 +1639,66 @@ describe("gateway agent handler", () => {
expect(callArgs.message).toContain("sourceTool=subagent_announce");
});
it("rejects public internal session-effect controls", async () => {
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
mocks.agentCommand.mockClear();
for (const params of [
{ sessionEffects: "internal" as const, idempotencyKey: "test-public-internal-effects" },
{ suppressPromptPersistence: true, idempotencyKey: "test-public-prompt-suppress" },
]) {
const respond = await invokeAgent(
{
message: "forged internal control",
agentId: "main",
sessionKey: "agent:main:main",
...params,
},
{ reqId: params.idempotencyKey, flushDispatch: false },
);
expectRespondError(respond, {
message: "internal session-effect controls are reserved for backend callers.",
});
}
expect(mocks.agentCommand).not.toHaveBeenCalled();
});
it("keeps backend internal session-effect runs out of visible gateway state", async () => {
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
mocks.agentCommand.mockClear();
mocks.registerAgentRunContext.mockClear();
const context = makeContext();
await invokeAgent(
{
message: "internal resume",
agentId: "main",
sessionKey: "agent:main:main",
sessionEffects: "internal",
suppressPromptPersistence: true,
idempotencyKey: "test-backend-internal-effects",
},
{
reqId: "backend-internal-effects",
client: backendGatewayClient(),
context,
},
);
const callArgs = await waitForAgentCommandCall<{
sessionEffects?: string;
suppressPromptPersistence?: boolean;
}>();
expect(callArgs.sessionEffects).toBe("internal");
expect(callArgs.suppressPromptPersistence).toBe(true);
expect(mocks.updateSessionStore).not.toHaveBeenCalled();
expect(context.addChatRun).not.toHaveBeenCalled();
expect(mocks.registerAgentRunContext).toHaveBeenCalledWith("test-backend-internal-effects", {
isControlUiVisible: false,
});
});
it("rejects public transcriptMessage overrides", async () => {
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
mocks.agentCommand.mockClear();

View File

@@ -767,6 +767,8 @@ export const agentHandlers: GatewayRequestHandlers = {
acpTurnSource?: "manual_spawn";
internalRuntimeHandoffId?: string;
internalEvents?: AgentInternalEvent[];
suppressPromptPersistence?: boolean;
sessionEffects?: "visible" | "internal";
idempotencyKey: string;
sourceReplyDeliveryMode?: "automatic" | "message_tool_only";
disableMessageTool?: boolean;
@@ -782,6 +784,8 @@ export const agentHandlers: GatewayRequestHandlers = {
const canResetSession = resolveCanResetSessionFromClient(client);
const canUseInternalRuntimeHandoff = resolveCanUseInternalRuntimeHandoff(client);
const requestedModelOverride = Boolean(request.provider || request.model);
const requestedInternalSessionEffects = request.sessionEffects === "internal";
const requestedPromptPersistenceSuppression = request.suppressPromptPersistence === true;
const isRawModelRun = request.modelRun === true || request.promptMode === "none";
if (requestedModelOverride && !allowModelOverride) {
respond(
@@ -794,6 +798,20 @@ export const agentHandlers: GatewayRequestHandlers = {
);
return;
}
if (
(requestedInternalSessionEffects || requestedPromptPersistenceSuppression) &&
!canUseInternalRuntimeHandoff
) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"internal session-effect controls are reserved for backend callers.",
),
);
return;
}
const providerOverride = allowModelOverride ? request.provider : undefined;
const modelOverride = allowModelOverride ? request.model : undefined;
const cfg = context.getRuntimeConfig();
@@ -821,6 +839,8 @@ export const agentHandlers: GatewayRequestHandlers = {
let resolvedGroupSpace: string | undefined = normalizedSpawned.groupSpace;
let spawnedByValue: string | undefined;
const inputProvenance = normalizeInputProvenance(request.inputProvenance);
const sessionEffects = requestedInternalSessionEffects ? "internal" : request.sessionEffects;
const suppressVisibleSessionEffects = sessionEffects === "internal";
const agentDedupeKeys = resolveAgentDedupeKeys({
idempotencyKey: idem,
execApprovalFollowupApprovalId,
@@ -1445,7 +1465,7 @@ export const agentHandlers: GatewayRequestHandlers = {
agentId,
}).sessionStartedAt
: undefined;
if (storePath) {
if (storePath && !suppressVisibleSessionEffects) {
const requestedStoreKey = requestedSessionKey;
let deniedBySendPolicy = false;
const persisted = await updateSessionStore(storePath, (store) => {
@@ -1514,7 +1534,10 @@ export const agentHandlers: GatewayRequestHandlers = {
return;
}
}
if (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") {
if (
!suppressVisibleSessionEffects &&
(canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global")
) {
context.addChatRun(idem, {
sessionKey: canonicalSessionKey,
clientRunId: idem,
@@ -1523,7 +1546,12 @@ export const agentHandlers: GatewayRequestHandlers = {
bestEffortDeliver = true;
}
}
registerAgentRunContext(idem, { sessionKey: canonicalSessionKey });
registerAgentRunContext(
idem,
suppressVisibleSessionEffects
? { isControlUiVisible: false }
: { sessionKey: canonicalSessionKey },
);
}
const connId = typeof client?.connId === "string" ? client.connId : undefined;
@@ -1921,12 +1949,15 @@ export const agentHandlers: GatewayRequestHandlers = {
acpTurnSource: request.acpTurnSource,
internalEvents: request.internalEvents,
inputProvenance,
sessionEffects,
sourceReplyDeliveryMode: request.sourceReplyDeliveryMode,
disableMessageTool: request.disableMessageTool,
suppressPromptPersistence: shouldSuppressAgentPromptPersistence({
inputProvenance,
internalEvents: request.internalEvents,
}),
suppressPromptPersistence:
requestedPromptPersistenceSuppression ||
shouldSuppressAgentPromptPersistence({
inputProvenance,
internalEvents: request.internalEvents,
}),
cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd,
abortSignal: activeRunAbort.controller.signal,
onActiveModelSelected: ({ provider }) => {