diff --git a/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts b/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts index e8feb41d48e..8d05e4e6e6d 100644 --- a/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts +++ b/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts @@ -16,6 +16,8 @@ import { resolvePlanningOnlyRetryLimit, resolvePlanningOnlyRetryInstruction, STRICT_AGENTIC_BLOCKED_TEXT, + resolveReplayInvalidFlag, + resolveRunLivenessState, } from "./run/incomplete-turn.js"; import type { EmbeddedRunAttemptResult } from "./run/types.js"; @@ -257,4 +259,45 @@ describe("runEmbeddedPiAgent incomplete-turn safety", () => { steps: ["I'll inspect the code.", "Then I'll patch the issue.", "Finally I'll run tests."], }); }); + + it("marks incomplete-turn retries as replay-invalid abandoned runs", () => { + const attempt = makeAttemptResult({ + assistantTexts: [], + lastAssistant: { + stopReason: "toolUse", + provider: "openai", + model: "gpt-5.4", + content: [], + } as unknown as EmbeddedRunAttemptResult["lastAssistant"], + }); + const incompleteTurnText = "⚠️ Agent couldn't generate a response. Please try again."; + + expect(resolveReplayInvalidFlag({ attempt, incompleteTurnText })).toBe(true); + expect( + resolveRunLivenessState({ + payloadCount: 0, + aborted: false, + timedOut: false, + attempt, + incompleteTurnText, + }), + ).toBe("abandoned"); + }); + + it("marks compaction-timeout retries as paused and replay-invalid", () => { + const attempt = makeAttemptResult({ + promptErrorSource: "compaction", + timedOutDuringCompaction: true, + }); + + expect(resolveReplayInvalidFlag({ attempt })).toBe(true); + expect( + resolveRunLivenessState({ + payloadCount: 0, + aborted: true, + timedOut: true, + attempt, + }), + ).toBe("paused"); + }); }); diff --git a/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts b/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts index 0ab65ce79c5..31b5ad859c5 100644 --- a/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts @@ -200,6 +200,7 @@ describe("timeout-triggered compaction", () => { expect(mockedCompactDirect).toHaveBeenCalledTimes(1); expect(result.payloads?.[0]?.isError).toBe(true); expect(result.payloads?.[0]?.text).toContain("timed out"); + expect(result.meta.livenessState).toBe("blocked"); }); it("does not attempt compaction when prompt token usage is low", async () => { diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 9e1b6745f46..5f979c9be63 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -101,6 +101,8 @@ import { resolvePlanningOnlyRetryLimit, resolvePlanningOnlyRetryInstruction, STRICT_AGENTIC_BLOCKED_TEXT, + resolveReplayInvalidFlag, + resolveRunLivenessState, } from "./run/incomplete-turn.js"; import type { RunEmbeddedPiAgentParams } from "./run/params.js"; import { buildEmbeddedRunPayloads } from "./run/payloads.js"; @@ -1099,6 +1101,10 @@ export async function runEmbeddedPiAgent( ); } const kind = isCompactionFailure ? "compaction_failure" : "context_overflow"; + attempt.setTerminalLifecycleMeta?.({ + replayInvalid: resolveReplayInvalidFlag({ attempt }), + livenessState: "blocked", + }); return { payloads: [ { @@ -1120,6 +1126,8 @@ export async function runEmbeddedPiAgent( lastTurnTotal, }), systemPromptReport: attempt.systemPromptReport, + replayInvalid: resolveReplayInvalidFlag({ attempt }), + livenessState: "blocked", error: { kind, message: errorText }, }, }; @@ -1147,6 +1155,10 @@ export async function runEmbeddedPiAgent( } // Handle role ordering errors with a user-friendly message if (/incorrect role information|roles must alternate/i.test(errorText)) { + attempt.setTerminalLifecycleMeta?.({ + replayInvalid: resolveReplayInvalidFlag({ attempt }), + livenessState: "blocked", + }); return { payloads: [ { @@ -1168,6 +1180,8 @@ export async function runEmbeddedPiAgent( lastTurnTotal, }), systemPromptReport: attempt.systemPromptReport, + replayInvalid: resolveReplayInvalidFlag({ attempt }), + livenessState: "blocked", error: { kind: "role_ordering", message: errorText }, }, }; @@ -1179,6 +1193,10 @@ export async function runEmbeddedPiAgent( const maxMbLabel = typeof maxMb === "number" && Number.isFinite(maxMb) ? `${maxMb}` : null; const maxBytesHint = maxMbLabel ? ` (max ${maxMbLabel}MB)` : ""; + attempt.setTerminalLifecycleMeta?.({ + replayInvalid: resolveReplayInvalidFlag({ attempt }), + livenessState: "blocked", + }); return { payloads: [ { @@ -1200,6 +1218,8 @@ export async function runEmbeddedPiAgent( lastTurnTotal, }), systemPromptReport: attempt.systemPromptReport, + replayInvalid: resolveReplayInvalidFlag({ attempt }), + livenessState: "blocked", error: { kind: "image_size", message: errorText }, }, }; @@ -1496,6 +1516,17 @@ export async function runEmbeddedPiAgent( aborted, systemPromptReport: attempt.systemPromptReport, finalAssistantVisibleText, + replayInvalid: resolveReplayInvalidFlag({ + attempt, + incompleteTurnText: null, + }), + livenessState: resolveRunLivenessState({ + payloadCount: payloads.length, + aborted, + timedOut, + attempt, + incompleteTurnText: null, + }), }, didSendViaMessagingTool: attempt.didSendViaMessagingTool, didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, @@ -1616,6 +1647,17 @@ export async function runEmbeddedPiAgent( aborted, systemPromptReport: attempt.systemPromptReport, finalAssistantVisibleText, + replayInvalid: resolveReplayInvalidFlag({ + attempt, + incompleteTurnText, + }), + livenessState: resolveRunLivenessState({ + payloadCount: payloads.length, + aborted, + timedOut, + attempt, + incompleteTurnText, + }), }, didSendViaMessagingTool: attempt.didSendViaMessagingTool, didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, @@ -1650,6 +1692,17 @@ export async function runEmbeddedPiAgent( aborted, systemPromptReport: attempt.systemPromptReport, finalAssistantVisibleText, + replayInvalid: resolveReplayInvalidFlag({ + attempt, + incompleteTurnText: null, + }), + livenessState: resolveRunLivenessState({ + payloadCount: payloads.length, + aborted, + timedOut, + attempt, + incompleteTurnText: null, + }), // Handle client tool calls (OpenResponses hosted tools) // Propagate the LLM stop reason so callers (lifecycle events, // ACP bridge) can distinguish end_turn from max_tokens. diff --git a/src/agents/pi-embedded-runner/run/incomplete-turn.ts b/src/agents/pi-embedded-runner/run/incomplete-turn.ts index 1f870abcf26..dceb721120e 100644 --- a/src/agents/pi-embedded-runner/run/incomplete-turn.ts +++ b/src/agents/pi-embedded-runner/run/incomplete-turn.ts @@ -1,6 +1,7 @@ import type { EmbeddedPiExecutionContract } from "../../../config/types.agent-defaults.js"; import { normalizeLowercaseStringOrEmpty } from "../../../shared/string-coerce.js"; import { isLikelyMutatingToolName } from "../../tool-mutation.js"; +import type { EmbeddedRunLivenessState } from "../types.js"; import type { EmbeddedRunAttemptResult } from "./types.js"; type ReplayMetadataAttempt = Pick< @@ -16,6 +17,8 @@ type IncompleteTurnAttempt = Pick< | "lastToolError" | "lastAssistant" | "replayMetadata" + | "promptErrorSource" + | "timedOutDuringCompaction" >; type PlanningOnlyAttempt = Pick< @@ -32,6 +35,11 @@ type PlanningOnlyAttempt = Pick< | "toolMetas" >; +type RunLivenessAttempt = Pick< + EmbeddedRunAttemptResult, + "lastAssistant" | "promptErrorSource" | "replayMetadata" | "timedOutDuringCompaction" +>; + const PLANNING_ONLY_PROMISE_RE = /\b(?:i(?:'ll| will)|let me|going to|first[, ]+i(?:'ll| will)|next[, ]+i(?:'ll| will)|i can do that)\b/i; const PLANNING_ONLY_COMPLETION_RE = @@ -134,6 +142,43 @@ export function resolveIncompleteTurnPayloadText(params: { : "⚠️ Agent couldn't generate a response. Please try again."; } +export function resolveReplayInvalidFlag(params: { + attempt: RunLivenessAttempt; + incompleteTurnText?: string | null; +}): boolean { + return ( + !params.attempt.replayMetadata.replaySafe || + params.attempt.promptErrorSource === "compaction" || + params.attempt.timedOutDuringCompaction || + Boolean(params.incompleteTurnText) + ); +} + +export function resolveRunLivenessState(params: { + payloadCount: number; + aborted: boolean; + timedOut: boolean; + attempt: RunLivenessAttempt; + incompleteTurnText?: string | null; +}): EmbeddedRunLivenessState { + if (params.incompleteTurnText) { + return "abandoned"; + } + if ( + params.attempt.promptErrorSource === "compaction" || + params.attempt.timedOutDuringCompaction + ) { + return "paused"; + } + if ((params.aborted || params.timedOut) && params.payloadCount === 0) { + return "blocked"; + } + if (params.attempt.lastAssistant?.stopReason === "error") { + return "blocked"; + } + return "working"; +} + function shouldApplyPlanningOnlyRetryGuard(params: { provider?: string; modelId?: string; diff --git a/src/agents/pi-embedded-runner/types.ts b/src/agents/pi-embedded-runner/types.ts index 1c01c40230f..ca143fb1375 100644 --- a/src/agents/pi-embedded-runner/types.ts +++ b/src/agents/pi-embedded-runner/types.ts @@ -31,12 +31,16 @@ export type EmbeddedPiAgentMeta = { }; }; +export type EmbeddedRunLivenessState = "working" | "paused" | "blocked" | "abandoned"; + export type EmbeddedPiRunMeta = { durationMs: number; agentMeta?: EmbeddedPiAgentMeta; aborted?: boolean; systemPromptReport?: SessionSystemPromptReport; finalAssistantVisibleText?: string; + replayInvalid?: boolean; + livenessState?: EmbeddedRunLivenessState; error?: { kind: | "context_overflow" diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.ts index 69adc9e3b3b..3875adf6c1e 100644 --- a/src/agents/pi-embedded-subscribe.handlers.compaction.ts +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.ts @@ -6,6 +6,7 @@ import { makeZeroUsageSnapshot } from "./usage.js"; export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { ctx.state.compactionInFlight = true; + ctx.state.livenessState = "paused"; ctx.ensureCompactionPromise(); ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); emitAgentEvent({ @@ -67,6 +68,7 @@ export function handleAutoCompactionEnd( ctx.resetForCompactionRetry(); ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`); } else { + ctx.state.livenessState = "working"; ctx.maybeResolveCompactionWait(); clearStaleAssistantUsageOnSessionMessages(ctx); } diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index c5b08dc5457..ad9d195a321 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -78,6 +78,7 @@ describe("handleAgentEnd", () => { data: { phase: "error", error: "LLM request failed: connection refused by the provider endpoint.", + livenessState: "blocked", }, }); }); @@ -191,6 +192,24 @@ describe("handleAgentEnd", () => { expect(ctx.log.debug).toHaveBeenCalledWith("embedded run agent end: runId=run-1 isError=false"); }); + it("surfaces replay-invalid paused lifecycle end state when present", async () => { + const onAgentEvent = vi.fn(); + const ctx = createContext(undefined, { onAgentEvent }); + ctx.state.replayInvalid = true; + ctx.state.livenessState = "paused"; + + await handleAgentEnd(ctx); + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { + phase: "end", + livenessState: "paused", + replayInvalid: true, + }, + }); + }); + it("flushes orphaned tool media as a media-only block reply", async () => { const ctx = createContext(undefined); ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"]; diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 926d2818075..753d4ec65fa 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -39,6 +39,9 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< const lastAssistant = ctx.state.lastAssistant; const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error"; let lifecycleErrorText: string | undefined; + const replayInvalid = ctx.state.replayInvalid === true ? true : undefined; + const livenessState = + ctx.state.livenessState === "working" && isError ? "blocked" : ctx.state.livenessState; if (isError && lastAssistant) { const friendlyError = formatAssistantErrorText(lastAssistant, { @@ -89,6 +92,8 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< data: { phase: "error", error: lifecycleErrorText ?? "LLM request failed.", + ...(livenessState ? { livenessState } : {}), + ...(replayInvalid ? { replayInvalid } : {}), endedAt: Date.now(), }, }); @@ -97,6 +102,8 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< data: { phase: "error", error: lifecycleErrorText ?? "LLM request failed.", + ...(livenessState ? { livenessState } : {}), + ...(replayInvalid ? { replayInvalid } : {}), }, }); return; @@ -106,12 +113,18 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< stream: "lifecycle", data: { phase: "end", + ...(livenessState ? { livenessState } : {}), + ...(replayInvalid ? { replayInvalid } : {}), endedAt: Date.now(), }, }); void ctx.params.onAgentEvent?.({ stream: "lifecycle", - data: { phase: "end" }, + data: { + phase: "end", + ...(livenessState ? { livenessState } : {}), + ...(replayInvalid ? { replayInvalid } : {}), + }, }); }; diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index d8a886b7a93..b015cc37366 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -6,6 +6,7 @@ import type { HookRunner } from "../plugins/hooks.js"; import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; import type { MessagingToolSend } from "./pi-embedded-messaging.js"; import type { BlockReplyPayload } from "./pi-embedded-payloads.js"; +import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js"; import type { BlockReplyChunking, SubscribeEmbeddedPiSessionParams, @@ -64,6 +65,8 @@ export type EmbeddedPiSubscribeState = { compactionRetryReject?: (reason?: unknown) => void; compactionRetryPromise: Promise | null; unsubscribed: boolean; + replayInvalid?: boolean; + livenessState?: EmbeddedRunLivenessState; messagingToolSentTexts: string[]; messagingToolSentTextsNormalized: string[]; diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 58651f869d8..92c0f4cf5eb 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -104,6 +104,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar compactionRetryReject: undefined, compactionRetryPromise: null, unsubscribed: false, + replayInvalid: false, + livenessState: "working", messagingToolSentTexts: [], messagingToolSentTextsNormalized: [], messagingToolSentTargets: [], @@ -691,6 +693,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.pendingToolAudioAsVoice = false; state.deterministicApprovalPromptPending = false; state.deterministicApprovalPromptSent = false; + state.replayInvalid = false; + state.livenessState = "working"; resetAssistantMessageState(0); };