fix: surface replay and liveness state

This commit is contained in:
Eva
2026-04-10 21:36:22 +07:00
committed by Peter Steinberger
parent 1038c1b8f3
commit aa5bec4bdf
10 changed files with 188 additions and 1 deletions

View File

@@ -16,6 +16,8 @@ import {
resolvePlanningOnlyRetryLimit,
resolvePlanningOnlyRetryInstruction,
STRICT_AGENTIC_BLOCKED_TEXT,
resolveReplayInvalidFlag,
resolveRunLivenessState,
} from "./run/incomplete-turn.js";
import type { EmbeddedRunAttemptResult } from "./run/types.js";
@@ -257,4 +259,45 @@ describe("runEmbeddedPiAgent incomplete-turn safety", () => {
steps: ["I'll inspect the code.", "Then I'll patch the issue.", "Finally I'll run tests."],
});
});
it("marks incomplete-turn retries as replay-invalid abandoned runs", () => {
const attempt = makeAttemptResult({
assistantTexts: [],
lastAssistant: {
stopReason: "toolUse",
provider: "openai",
model: "gpt-5.4",
content: [],
} as unknown as EmbeddedRunAttemptResult["lastAssistant"],
});
const incompleteTurnText = "⚠️ Agent couldn't generate a response. Please try again.";
expect(resolveReplayInvalidFlag({ attempt, incompleteTurnText })).toBe(true);
expect(
resolveRunLivenessState({
payloadCount: 0,
aborted: false,
timedOut: false,
attempt,
incompleteTurnText,
}),
).toBe("abandoned");
});
it("marks compaction-timeout retries as paused and replay-invalid", () => {
const attempt = makeAttemptResult({
promptErrorSource: "compaction",
timedOutDuringCompaction: true,
});
expect(resolveReplayInvalidFlag({ attempt })).toBe(true);
expect(
resolveRunLivenessState({
payloadCount: 0,
aborted: true,
timedOut: true,
attempt,
}),
).toBe("paused");
});
});

View File

@@ -200,6 +200,7 @@ describe("timeout-triggered compaction", () => {
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
expect(result.payloads?.[0]?.isError).toBe(true);
expect(result.payloads?.[0]?.text).toContain("timed out");
expect(result.meta.livenessState).toBe("blocked");
});
it("does not attempt compaction when prompt token usage is low", async () => {

View File

@@ -101,6 +101,8 @@ import {
resolvePlanningOnlyRetryLimit,
resolvePlanningOnlyRetryInstruction,
STRICT_AGENTIC_BLOCKED_TEXT,
resolveReplayInvalidFlag,
resolveRunLivenessState,
} from "./run/incomplete-turn.js";
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
@@ -1099,6 +1101,10 @@ export async function runEmbeddedPiAgent(
);
}
const kind = isCompactionFailure ? "compaction_failure" : "context_overflow";
attempt.setTerminalLifecycleMeta?.({
replayInvalid: resolveReplayInvalidFlag({ attempt }),
livenessState: "blocked",
});
return {
payloads: [
{
@@ -1120,6 +1126,8 @@ export async function runEmbeddedPiAgent(
lastTurnTotal,
}),
systemPromptReport: attempt.systemPromptReport,
replayInvalid: resolveReplayInvalidFlag({ attempt }),
livenessState: "blocked",
error: { kind, message: errorText },
},
};
@@ -1147,6 +1155,10 @@ export async function runEmbeddedPiAgent(
}
// Handle role ordering errors with a user-friendly message
if (/incorrect role information|roles must alternate/i.test(errorText)) {
attempt.setTerminalLifecycleMeta?.({
replayInvalid: resolveReplayInvalidFlag({ attempt }),
livenessState: "blocked",
});
return {
payloads: [
{
@@ -1168,6 +1180,8 @@ export async function runEmbeddedPiAgent(
lastTurnTotal,
}),
systemPromptReport: attempt.systemPromptReport,
replayInvalid: resolveReplayInvalidFlag({ attempt }),
livenessState: "blocked",
error: { kind: "role_ordering", message: errorText },
},
};
@@ -1179,6 +1193,10 @@ export async function runEmbeddedPiAgent(
const maxMbLabel =
typeof maxMb === "number" && Number.isFinite(maxMb) ? `${maxMb}` : null;
const maxBytesHint = maxMbLabel ? ` (max ${maxMbLabel}MB)` : "";
attempt.setTerminalLifecycleMeta?.({
replayInvalid: resolveReplayInvalidFlag({ attempt }),
livenessState: "blocked",
});
return {
payloads: [
{
@@ -1200,6 +1218,8 @@ export async function runEmbeddedPiAgent(
lastTurnTotal,
}),
systemPromptReport: attempt.systemPromptReport,
replayInvalid: resolveReplayInvalidFlag({ attempt }),
livenessState: "blocked",
error: { kind: "image_size", message: errorText },
},
};
@@ -1496,6 +1516,17 @@ export async function runEmbeddedPiAgent(
aborted,
systemPromptReport: attempt.systemPromptReport,
finalAssistantVisibleText,
replayInvalid: resolveReplayInvalidFlag({
attempt,
incompleteTurnText: null,
}),
livenessState: resolveRunLivenessState({
payloadCount: payloads.length,
aborted,
timedOut,
attempt,
incompleteTurnText: null,
}),
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
@@ -1616,6 +1647,17 @@ export async function runEmbeddedPiAgent(
aborted,
systemPromptReport: attempt.systemPromptReport,
finalAssistantVisibleText,
replayInvalid: resolveReplayInvalidFlag({
attempt,
incompleteTurnText,
}),
livenessState: resolveRunLivenessState({
payloadCount: payloads.length,
aborted,
timedOut,
attempt,
incompleteTurnText,
}),
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
@@ -1650,6 +1692,17 @@ export async function runEmbeddedPiAgent(
aborted,
systemPromptReport: attempt.systemPromptReport,
finalAssistantVisibleText,
replayInvalid: resolveReplayInvalidFlag({
attempt,
incompleteTurnText: null,
}),
livenessState: resolveRunLivenessState({
payloadCount: payloads.length,
aborted,
timedOut,
attempt,
incompleteTurnText: null,
}),
// Handle client tool calls (OpenResponses hosted tools)
// Propagate the LLM stop reason so callers (lifecycle events,
// ACP bridge) can distinguish end_turn from max_tokens.

View File

@@ -1,6 +1,7 @@
import type { EmbeddedPiExecutionContract } from "../../../config/types.agent-defaults.js";
import { normalizeLowercaseStringOrEmpty } from "../../../shared/string-coerce.js";
import { isLikelyMutatingToolName } from "../../tool-mutation.js";
import type { EmbeddedRunLivenessState } from "../types.js";
import type { EmbeddedRunAttemptResult } from "./types.js";
type ReplayMetadataAttempt = Pick<
@@ -16,6 +17,8 @@ type IncompleteTurnAttempt = Pick<
| "lastToolError"
| "lastAssistant"
| "replayMetadata"
| "promptErrorSource"
| "timedOutDuringCompaction"
>;
type PlanningOnlyAttempt = Pick<
@@ -32,6 +35,11 @@ type PlanningOnlyAttempt = Pick<
| "toolMetas"
>;
type RunLivenessAttempt = Pick<
EmbeddedRunAttemptResult,
"lastAssistant" | "promptErrorSource" | "replayMetadata" | "timedOutDuringCompaction"
>;
const PLANNING_ONLY_PROMISE_RE =
/\b(?:i(?:'ll| will)|let me|going to|first[, ]+i(?:'ll| will)|next[, ]+i(?:'ll| will)|i can do that)\b/i;
const PLANNING_ONLY_COMPLETION_RE =
@@ -134,6 +142,43 @@ export function resolveIncompleteTurnPayloadText(params: {
: "⚠️ Agent couldn't generate a response. Please try again.";
}
export function resolveReplayInvalidFlag(params: {
attempt: RunLivenessAttempt;
incompleteTurnText?: string | null;
}): boolean {
return (
!params.attempt.replayMetadata.replaySafe ||
params.attempt.promptErrorSource === "compaction" ||
params.attempt.timedOutDuringCompaction ||
Boolean(params.incompleteTurnText)
);
}
export function resolveRunLivenessState(params: {
payloadCount: number;
aborted: boolean;
timedOut: boolean;
attempt: RunLivenessAttempt;
incompleteTurnText?: string | null;
}): EmbeddedRunLivenessState {
if (params.incompleteTurnText) {
return "abandoned";
}
if (
params.attempt.promptErrorSource === "compaction" ||
params.attempt.timedOutDuringCompaction
) {
return "paused";
}
if ((params.aborted || params.timedOut) && params.payloadCount === 0) {
return "blocked";
}
if (params.attempt.lastAssistant?.stopReason === "error") {
return "blocked";
}
return "working";
}
function shouldApplyPlanningOnlyRetryGuard(params: {
provider?: string;
modelId?: string;

View File

@@ -31,12 +31,16 @@ export type EmbeddedPiAgentMeta = {
};
};
export type EmbeddedRunLivenessState = "working" | "paused" | "blocked" | "abandoned";
export type EmbeddedPiRunMeta = {
durationMs: number;
agentMeta?: EmbeddedPiAgentMeta;
aborted?: boolean;
systemPromptReport?: SessionSystemPromptReport;
finalAssistantVisibleText?: string;
replayInvalid?: boolean;
livenessState?: EmbeddedRunLivenessState;
error?: {
kind:
| "context_overflow"

View File

@@ -6,6 +6,7 @@ import { makeZeroUsageSnapshot } from "./usage.js";
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
ctx.state.compactionInFlight = true;
ctx.state.livenessState = "paused";
ctx.ensureCompactionPromise();
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
emitAgentEvent({
@@ -67,6 +68,7 @@ export function handleAutoCompactionEnd(
ctx.resetForCompactionRetry();
ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`);
} else {
ctx.state.livenessState = "working";
ctx.maybeResolveCompactionWait();
clearStaleAssistantUsageOnSessionMessages(ctx);
}

View File

@@ -78,6 +78,7 @@ describe("handleAgentEnd", () => {
data: {
phase: "error",
error: "LLM request failed: connection refused by the provider endpoint.",
livenessState: "blocked",
},
});
});
@@ -191,6 +192,24 @@ describe("handleAgentEnd", () => {
expect(ctx.log.debug).toHaveBeenCalledWith("embedded run agent end: runId=run-1 isError=false");
});
it("surfaces replay-invalid paused lifecycle end state when present", async () => {
const onAgentEvent = vi.fn();
const ctx = createContext(undefined, { onAgentEvent });
ctx.state.replayInvalid = true;
ctx.state.livenessState = "paused";
await handleAgentEnd(ctx);
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
phase: "end",
livenessState: "paused",
replayInvalid: true,
},
});
});
it("flushes orphaned tool media as a media-only block reply", async () => {
const ctx = createContext(undefined);
ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"];

View File

@@ -39,6 +39,9 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
const lastAssistant = ctx.state.lastAssistant;
const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error";
let lifecycleErrorText: string | undefined;
const replayInvalid = ctx.state.replayInvalid === true ? true : undefined;
const livenessState =
ctx.state.livenessState === "working" && isError ? "blocked" : ctx.state.livenessState;
if (isError && lastAssistant) {
const friendlyError = formatAssistantErrorText(lastAssistant, {
@@ -89,6 +92,8 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
data: {
phase: "error",
error: lifecycleErrorText ?? "LLM request failed.",
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
endedAt: Date.now(),
},
});
@@ -97,6 +102,8 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
data: {
phase: "error",
error: lifecycleErrorText ?? "LLM request failed.",
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
},
});
return;
@@ -106,12 +113,18 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
stream: "lifecycle",
data: {
phase: "end",
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
endedAt: Date.now(),
},
});
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: { phase: "end" },
data: {
phase: "end",
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
},
});
};

View File

@@ -6,6 +6,7 @@ import type { HookRunner } from "../plugins/hooks.js";
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
import type { BlockReplyPayload } from "./pi-embedded-payloads.js";
import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js";
import type {
BlockReplyChunking,
SubscribeEmbeddedPiSessionParams,
@@ -64,6 +65,8 @@ export type EmbeddedPiSubscribeState = {
compactionRetryReject?: (reason?: unknown) => void;
compactionRetryPromise: Promise<void> | null;
unsubscribed: boolean;
replayInvalid?: boolean;
livenessState?: EmbeddedRunLivenessState;
messagingToolSentTexts: string[];
messagingToolSentTextsNormalized: string[];

View File

@@ -104,6 +104,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
compactionRetryReject: undefined,
compactionRetryPromise: null,
unsubscribed: false,
replayInvalid: false,
livenessState: "working",
messagingToolSentTexts: [],
messagingToolSentTextsNormalized: [],
messagingToolSentTargets: [],
@@ -691,6 +693,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.pendingToolAudioAsVoice = false;
state.deterministicApprovalPromptPending = false;
state.deterministicApprovalPromptSent = false;
state.replayInvalid = false;
state.livenessState = "working";
resetAssistantMessageState(0);
};