diff --git a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts index 58b91f1a1d7..48d9ebd6a65 100644 --- a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts +++ b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts @@ -6,6 +6,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite import type { OpenClawConfig } from "../config/config.js"; import { redactIdentifier } from "../logging/redact-identifier.js"; import type { AuthProfileFailureReason } from "./auth-profiles.js"; +import { buildAttemptReplayMetadata } from "./pi-embedded-runner/run/incomplete-turn.js"; import type { EmbeddedRunAttemptResult } from "./pi-embedded-runner/run/types.js"; const runEmbeddedAttemptMock = vi.fn<(params: unknown) => Promise>(); @@ -174,24 +175,36 @@ const buildAssistant = (overrides: Partial): AssistantMessage ...overrides, }); -const makeAttempt = (overrides: Partial): EmbeddedRunAttemptResult => ({ - aborted: false, - timedOut: false, - timedOutDuringCompaction: false, - promptError: null, - sessionIdUsed: "session:test", - systemPromptReport: undefined, - messagesSnapshot: [], - assistantTexts: [], - toolMetas: [], - lastAssistant: undefined, - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - cloudCodeAssistFormatError: false, - ...overrides, -}); +const makeAttempt = (overrides: Partial): EmbeddedRunAttemptResult => { + const toolMetas = overrides.toolMetas ?? []; + const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false; + const successfulCronAdds = overrides.successfulCronAdds; + return { + aborted: false, + timedOut: false, + timedOutDuringCompaction: false, + promptError: null, + sessionIdUsed: "session:test", + systemPromptReport: undefined, + messagesSnapshot: [], + assistantTexts: [], + toolMetas, + lastAssistant: undefined, + replayMetadata: + overrides.replayMetadata ?? + buildAttemptReplayMetadata({ + toolMetas, + didSendViaMessagingTool, + successfulCronAdds, + }), + didSendViaMessagingTool, + messagingToolSentTexts: [], + messagingToolSentMediaUrls: [], + messagingToolSentTargets: [], + cloudCodeAssistFormatError: false, + ...overrides, + }; +}; const makeConfig = (opts?: { fallbacks?: string[]; diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts index 8c320f765be..b2487777e4e 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts @@ -1,3 +1,4 @@ +import { buildAttemptReplayMetadata } from "./run/incomplete-turn.js"; import type { EmbeddedRunAttemptResult } from "./run/types.js"; export const DEFAULT_OVERFLOW_ERROR_MESSAGE = @@ -28,6 +29,9 @@ export function makeCompactionSuccess(params: { export function makeAttemptResult( overrides: Partial = {}, ): EmbeddedRunAttemptResult { + const toolMetas = overrides.toolMetas ?? []; + const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false; + const successfulCronAdds = overrides.successfulCronAdds; return { aborted: false, timedOut: false, @@ -35,10 +39,17 @@ export function makeAttemptResult( promptError: null, sessionIdUsed: "test-session", assistantTexts: ["Hello!"], - toolMetas: [], + toolMetas, lastAssistant: undefined, messagesSnapshot: [], - didSendViaMessagingTool: false, + replayMetadata: + overrides.replayMetadata ?? + buildAttemptReplayMetadata({ + toolMetas, + didSendViaMessagingTool, + successfulCronAdds, + }), + didSendViaMessagingTool, messagingToolSentTexts: [], messagingToolSentMediaUrls: [], messagingToolSentTargets: [], diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 06fc3eb8a6a..9e92007af4b 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -45,7 +45,6 @@ import { extractObservedOverflowTokenCount, type FailoverReason, formatAssistantErrorText, - formatBillingErrorMessage, isAuthAssistantError, isBillingAssistantError, isCompactionFailureError, @@ -53,13 +52,11 @@ import { isFailoverErrorMessage, isLikelyContextOverflowError, isRateLimitAssistantError, - isTimeoutErrorMessage, parseImageDimensionError, parseImageSizeError, pickFallbackThinkingLevel, } from "../pi-embedded-helpers.js"; import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; -import { isLikelyMutatingToolName } from "../tool-mutation.js"; import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js"; import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js"; import { runPostCompactionSideEffects } from "./compact.js"; @@ -68,9 +65,11 @@ import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { resolveModelAsync } from "./model.js"; +import { handleAssistantFailover } from "./run/assistant-failover.js"; import { runEmbeddedAttempt } from "./run/attempt.js"; import { createEmbeddedRunAuthController } from "./run/auth-controller.js"; import { createFailoverDecisionLogger } from "./run/failover-observation.js"; +import { mergeRetryFailoverReason, resolveRunFailoverDecision } from "./run/failover-policy.js"; import { buildErrorAgentMeta, buildUsageAgentMetaFields, @@ -83,8 +82,10 @@ import { type RuntimeAuthState, scrubAnthropicRefusalMagic, } from "./run/helpers.js"; +import { resolveIncompleteTurnPayloadText } from "./run/incomplete-turn.js"; import type { RunEmbeddedPiAgentParams } from "./run/params.js"; import { buildEmbeddedRunPayloads } from "./run/payloads.js"; +import { handleRetryLimitExhaustion } from "./run/retry-limit.js"; import { resolveEffectiveRuntimeModel, resolveHookModelSelection } from "./run/setup.js"; import { sessionLikelyHasOversizedToolResults, @@ -449,44 +450,27 @@ export async function runEmbeddedPiAgent( `provider=${provider}/${modelId} attempts=${runLoopIterations} ` + `maxAttempts=${MAX_RUN_LOOP_ITERATIONS}`, ); - if ( - fallbackConfigured && - lastRetryFailoverReason && - lastRetryFailoverReason !== "timeout" && - lastRetryFailoverReason !== "model_not_found" && - lastRetryFailoverReason !== "format" && - lastRetryFailoverReason !== "session_expired" - ) { - throw new FailoverError(message, { - reason: lastRetryFailoverReason, + const retryLimitDecision = resolveRunFailoverDecision({ + stage: "retry_limit", + fallbackConfigured, + failoverReason: lastRetryFailoverReason, + }); + return handleRetryLimitExhaustion({ + message, + decision: retryLimitDecision, + provider, + model: modelId, + profileId: lastProfileId, + durationMs: Date.now() - started, + agentMeta: buildErrorAgentMeta({ + sessionId: params.sessionId, provider, - model: modelId, - profileId: lastProfileId, - status: resolveFailoverStatus(lastRetryFailoverReason), - }); - } - return { - payloads: [ - { - text: - "Request failed after repeated internal retries. " + - "Please try again, or use /new to start a fresh session.", - isError: true, - }, - ], - meta: { - durationMs: Date.now() - started, - agentMeta: buildErrorAgentMeta({ - sessionId: params.sessionId, - provider, - model: model.id, - usageAccumulator, - lastRunPromptUsage, - lastTurnTotal, - }), - error: { kind: "retry_limit", message }, - }, - }; + model: model.id, + usageAccumulator, + lastRunPromptUsage, + lastTurnTotal, + }), + }); } runLoopIterations += 1; const runtimeAuthRetry = authRetryPending; @@ -1083,16 +1067,36 @@ export async function runEmbeddedPiAgent( logFallbackDecision: logPromptFailoverDecision, }); } + let promptFailoverDecision = resolveRunFailoverDecision({ + stage: "prompt", + aborted, + fallbackConfigured, + failoverFailure: promptFailoverFailure, + failoverReason: promptFailoverReason, + profileRotated: false, + }); if ( - promptFailoverFailure && - promptFailoverReason !== "timeout" && + promptFailoverDecision.action === "rotate_profile" && (await advanceAuthProfile()) ) { - lastRetryFailoverReason = promptFailoverReason ?? lastRetryFailoverReason; + lastRetryFailoverReason = mergeRetryFailoverReason({ + previous: lastRetryFailoverReason, + failoverReason: promptFailoverReason, + }); logPromptFailoverDecision("rotate_profile"); await maybeBackoffBeforeOverloadFailover(promptFailoverReason); continue; } + if (promptFailoverDecision.action === "rotate_profile") { + promptFailoverDecision = resolveRunFailoverDecision({ + stage: "prompt", + aborted, + fallbackConfigured, + failoverFailure: promptFailoverFailure, + failoverReason: promptFailoverReason, + profileRotated: true, + }); + } const fallbackThinking = pickFallbackThinkingLevel({ message: errorText, attempted: attemptedThinking, @@ -1107,22 +1111,23 @@ export async function runEmbeddedPiAgent( // Throw FailoverError for prompt-side failover reasons when fallbacks // are configured so outer model fallback can continue on overload, // rate-limit, auth, or billing failures. - if (fallbackConfigured && promptFailoverFailure) { - const status = resolveFailoverStatus(promptFailoverReason ?? "unknown"); + if (promptFailoverDecision.action === "fallback_model") { + const fallbackReason = promptFailoverDecision.reason ?? "unknown"; + const status = resolveFailoverStatus(fallbackReason); logPromptFailoverDecision("fallback_model", { status }); await maybeBackoffBeforeOverloadFailover(promptFailoverReason); throw ( normalizedPromptFailover ?? new FailoverError(errorText, { - reason: promptFailoverReason ?? "unknown", + reason: fallbackReason, provider, model: modelId, profileId: lastProfileId, - status: resolveFailoverStatus(promptFailoverReason ?? "unknown"), + status, }) ); } - if (promptFailoverFailure || promptFailoverReason) { + if (promptFailoverDecision.action === "surface_error") { logPromptFailoverDecision("surface_error"); } throw promptError; @@ -1194,120 +1199,54 @@ export async function runEmbeddedPiAgent( ); } - // Rotate on timeout to try another account/model path in this turn, - // but exclude post-prompt compaction timeouts (model succeeded; no profile issue). - const shouldRotate = - (!aborted && (failoverFailure || assistantFailoverReason !== null)) || - (timedOut && !timedOutDuringCompaction); - - if (shouldRotate) { - if (lastProfileId) { - const reason = timedOut ? "timeout" : assistantProfileFailureReason; - // Skip cooldown for timeouts: a timeout is model/network-specific, - // not an auth issue. Marking the profile would poison fallback models - // on the same provider (e.g. gpt-5.3 timeout blocks gpt-5.2). - await maybeMarkAuthProfileFailure({ - profileId: lastProfileId, - reason, - modelId, - }); - if (timedOut && !isProbeSession) { - log.warn(`Profile ${lastProfileId} timed out. Trying next account...`); - } - if (cloudCodeAssistFormatError) { - log.warn( - `Profile ${lastProfileId} hit Cloud Code Assist format error. Tool calls will be sanitized on retry.`, - ); - } - } - - // For overloaded errors, check the configured rotation cap *before* - // calling advanceAuthProfile() to avoid a wasted auth-profile setup - // cycle. advanceAuthProfile() runs applyApiKeyInfo() which - // initializes the next profile — costly work that is pointless when - // we already know we will escalate to cross-provider fallback. - // See: https://github.com/openclaw/openclaw/issues/58348 - if (assistantFailoverReason === "overloaded") { - overloadProfileRotations += 1; - if (overloadProfileRotations > overloadProfileRotationLimit && fallbackConfigured) { - const status = resolveFailoverStatus("overloaded"); - log.warn( - `overload profile rotation cap reached for ${sanitizeForLog(provider)}/${sanitizeForLog(modelId)} after ${overloadProfileRotations} rotations; escalating to model fallback`, - ); - logAssistantFailoverDecision("fallback_model", { status }); - throw new FailoverError( - "The AI service is temporarily overloaded. Please try again in a moment.", - { - reason: "overloaded", - provider: activeErrorContext.provider, - model: activeErrorContext.model, - profileId: lastProfileId, - status, - }, - ); - } - } - - // For rate-limit errors, apply the same rotation cap so that - // per-model quota exhaustion (e.g. Anthropic Sonnet-only limits) - // escalates to cross-provider model fallback instead of spinning - // forever across profiles that share the same model quota. - // See: https://github.com/openclaw/openclaw/issues/58572 - if (assistantFailoverReason === "rate_limit") { - maybeEscalateRateLimitProfileFallback({ - failoverProvider: activeErrorContext.provider, - failoverModel: activeErrorContext.model, - logFallbackDecision: logAssistantFailoverDecision, - }); - } - - const rotated = await advanceAuthProfile(); - if (rotated) { - lastRetryFailoverReason = - assistantFailoverReason ?? (timedOut ? "timeout" : null) ?? lastRetryFailoverReason; - logAssistantFailoverDecision("rotate_profile"); - await maybeBackoffBeforeOverloadFailover(assistantFailoverReason); - continue; - } - - if (fallbackConfigured) { - await maybeBackoffBeforeOverloadFailover(assistantFailoverReason); - // Prefer formatted error message (user-friendly) over raw errorMessage - const message = - (lastAssistant - ? formatAssistantErrorText(lastAssistant, { - cfg: params.config, - sessionKey: params.sessionKey ?? params.sessionId, - provider: activeErrorContext.provider, - model: activeErrorContext.model, - }) - : undefined) || - lastAssistant?.errorMessage?.trim() || - (timedOut - ? "LLM request timed out." - : rateLimitFailure - ? "LLM request rate limited." - : billingFailure - ? formatBillingErrorMessage( - activeErrorContext.provider, - activeErrorContext.model, - ) - : authFailure - ? "LLM request unauthorized." - : "LLM request failed."); - const status = - resolveFailoverStatus(assistantFailoverReason ?? "unknown") ?? - (isTimeoutErrorMessage(message) ? 408 : undefined); - logAssistantFailoverDecision("fallback_model", { status }); - throw new FailoverError(message, { - reason: assistantFailoverReason ?? "unknown", - provider: activeErrorContext.provider, - model: activeErrorContext.model, - profileId: lastProfileId, - status, - }); - } - logAssistantFailoverDecision("surface_error"); + const assistantFailoverDecision = resolveRunFailoverDecision({ + stage: "assistant", + aborted, + fallbackConfigured, + failoverFailure, + failoverReason: assistantFailoverReason, + timedOut, + timedOutDuringCompaction, + profileRotated: false, + }); + const assistantFailoverOutcome = await handleAssistantFailover({ + initialDecision: assistantFailoverDecision, + aborted, + fallbackConfigured, + failoverFailure, + failoverReason: assistantFailoverReason, + timedOut, + timedOutDuringCompaction, + assistantProfileFailureReason, + lastProfileId, + modelId, + provider, + activeErrorContext, + lastAssistant, + config: params.config, + sessionKey: params.sessionKey ?? params.sessionId, + authFailure, + rateLimitFailure, + billingFailure, + cloudCodeAssistFormatError, + isProbeSession, + overloadProfileRotations, + overloadProfileRotationLimit, + previousRetryFailoverReason: lastRetryFailoverReason, + logAssistantFailoverDecision, + warn: (message) => log.warn(message), + maybeMarkAuthProfileFailure, + maybeEscalateRateLimitProfileFallback, + maybeBackoffBeforeOverloadFailover, + advanceAuthProfile, + }); + overloadProfileRotations = assistantFailoverOutcome.overloadProfileRotations; + if (assistantFailoverOutcome.action === "retry") { + lastRetryFailoverReason = assistantFailoverOutcome.lastRetryFailoverReason; + continue; + } + if (assistantFailoverOutcome.action === "throw") { + throw assistantFailoverOutcome.error; } const usageMeta = buildUsageAgentMetaFields({ @@ -1373,83 +1312,50 @@ export async function runEmbeddedPiAgent( }; } - // Detect incomplete turns where prompt() resolved prematurely due to - // pi-agent-core's auto-retry timing issue: when a mid-turn 429/overload - // triggers an internal retry, waitForRetry() resolves on the next - // assistant message *before* tool execution completes in the retried - // loop (see #8643). The captured lastAssistant has a non-terminal - // stopReason (e.g. "toolUse") with no text content, producing empty - // payloads. Surface an error instead of silently dropping the reply. - // - // Exclusions: - // - didSendDeterministicApprovalPrompt: approval-prompt turns - // intentionally produce empty payloads with stopReason=toolUse - // - lastToolError: suppressed/recoverable tool failures also produce - // empty payloads with stopReason=toolUse; those are handled by - // buildEmbeddedRunPayloads' own warning policy - if ( - payloads.length === 0 && - !aborted && - !timedOut && - !attempt.clientToolCall && - !attempt.yieldDetected && - !attempt.didSendDeterministicApprovalPrompt && - !attempt.lastToolError - ) { - const incompleteStopReason = lastAssistant?.stopReason; - // Only trigger for non-terminal stop reasons (toolUse, etc.) to - // avoid false positives when the model legitimately produces no text. - // StopReason union: "aborted" | "error" | "length" | "toolUse" - // "toolUse" is the key signal that prompt() resolved mid-turn. - if (incompleteStopReason === "toolUse" || incompleteStopReason === "error") { - log.warn( - `incomplete turn detected: runId=${params.runId} sessionId=${params.sessionId} ` + - `stopReason=${incompleteStopReason} payloads=0 — surfacing error to user`, - ); + // Detect incomplete turns where prompt() resolved prematurely and the + // runner would otherwise drop an empty reply. + const incompleteTurnText = resolveIncompleteTurnPayloadText({ + payloadCount: payloads.length, + aborted, + timedOut, + attempt, + }); + if (incompleteTurnText) { + const incompleteStopReason = attempt.lastAssistant?.stopReason; + log.warn( + `incomplete turn detected: runId=${params.runId} sessionId=${params.sessionId} ` + + `stopReason=${incompleteStopReason} payloads=0 — surfacing error to user`, + ); - // Mark the failing profile for cooldown so multi-profile setups - // rotate away from the exhausted credential on the next turn. - if (lastProfileId) { - await maybeMarkAuthProfileFailure({ - profileId: lastProfileId, - reason: resolveAuthProfileFailureReason(assistantFailoverReason), - }); - } - - // Warn about potential side-effects when the interrupted turn may - // already have mutated state or sent outbound actions. - const hadMutatingTools = attempt.toolMetas.some((t) => - isLikelyMutatingToolName(t.toolName), - ); - const hadPotentialSideEffects = - hadMutatingTools || - attempt.didSendViaMessagingTool || - (attempt.successfulCronAdds ?? 0) > 0; - const errorText = hadPotentialSideEffects - ? "⚠️ Agent couldn't generate a response. Note: some tool actions may have already been executed — please verify before retrying." - : "⚠️ Agent couldn't generate a response. Please try again."; - - return { - payloads: [ - { - text: errorText, - isError: true, - }, - ], - meta: { - durationMs: Date.now() - started, - agentMeta, - aborted, - systemPromptReport: attempt.systemPromptReport, - }, - didSendViaMessagingTool: attempt.didSendViaMessagingTool, - didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, - messagingToolSentTexts: attempt.messagingToolSentTexts, - messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls, - messagingToolSentTargets: attempt.messagingToolSentTargets, - successfulCronAdds: attempt.successfulCronAdds, - }; + // Mark the failing profile for cooldown so multi-profile setups + // rotate away from the exhausted credential on the next turn. + if (lastProfileId) { + await maybeMarkAuthProfileFailure({ + profileId: lastProfileId, + reason: resolveAuthProfileFailureReason(assistantFailoverReason), + }); } + + return { + payloads: [ + { + text: incompleteTurnText, + isError: true, + }, + ], + meta: { + durationMs: Date.now() - started, + agentMeta, + aborted, + systemPromptReport: attempt.systemPromptReport, + }, + didSendViaMessagingTool: attempt.didSendViaMessagingTool, + didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, + messagingToolSentTexts: attempt.messagingToolSentTexts, + messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls, + messagingToolSentTargets: attempt.messagingToolSentTargets, + successfulCronAdds: attempt.successfulCronAdds, + }; } log.debug( diff --git a/src/agents/pi-embedded-runner/run/assistant-failover.ts b/src/agents/pi-embedded-runner/run/assistant-failover.ts new file mode 100644 index 00000000000..18a71527109 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/assistant-failover.ts @@ -0,0 +1,208 @@ +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import type { OpenClawConfig } from "../../../config/config.js"; +import { sanitizeForLog } from "../../../terminal/ansi.js"; +import type { AuthProfileFailureReason } from "../../auth-profiles.js"; +import { FailoverError, resolveFailoverStatus } from "../../failover-error.js"; +import { + formatAssistantErrorText, + formatBillingErrorMessage, + isTimeoutErrorMessage, + type FailoverReason, +} from "../../pi-embedded-helpers.js"; +import { + mergeRetryFailoverReason, + resolveRunFailoverDecision, + type AssistantFailoverDecision, +} from "./failover-policy.js"; + +type AssistantFailoverOutcome = + | { + action: "continue_normal"; + overloadProfileRotations: number; + } + | { + action: "retry"; + overloadProfileRotations: number; + lastRetryFailoverReason: FailoverReason | null; + } + | { + action: "throw"; + overloadProfileRotations: number; + error: FailoverError; + }; + +export async function handleAssistantFailover(params: { + initialDecision: AssistantFailoverDecision; + aborted: boolean; + fallbackConfigured: boolean; + failoverFailure: boolean; + failoverReason: FailoverReason | null; + timedOut: boolean; + timedOutDuringCompaction: boolean; + assistantProfileFailureReason: AuthProfileFailureReason | null; + lastProfileId?: string; + modelId: string; + provider: string; + activeErrorContext: { provider: string; model: string }; + lastAssistant: AssistantMessage | undefined; + config: OpenClawConfig | undefined; + sessionKey?: string; + authFailure: boolean; + rateLimitFailure: boolean; + billingFailure: boolean; + cloudCodeAssistFormatError: boolean; + isProbeSession: boolean; + overloadProfileRotations: number; + overloadProfileRotationLimit: number; + previousRetryFailoverReason: FailoverReason | null; + logAssistantFailoverDecision: ( + decision: "rotate_profile" | "fallback_model" | "surface_error", + extra?: { status?: number }, + ) => void; + warn: (message: string) => void; + maybeMarkAuthProfileFailure: (failure: { + profileId?: string; + reason?: AuthProfileFailureReason | null; + modelId?: string; + }) => Promise; + maybeEscalateRateLimitProfileFallback: (params: { + failoverProvider: string; + failoverModel: string; + logFallbackDecision: (decision: "fallback_model", extra?: { status?: number }) => void; + }) => void; + maybeBackoffBeforeOverloadFailover: (reason: FailoverReason | null) => Promise; + advanceAuthProfile: () => Promise; +}): Promise { + let overloadProfileRotations = params.overloadProfileRotations; + let decision = params.initialDecision; + + if (decision.action === "rotate_profile") { + if (params.lastProfileId) { + const reason = params.timedOut ? "timeout" : params.assistantProfileFailureReason; + await params.maybeMarkAuthProfileFailure({ + profileId: params.lastProfileId, + reason, + modelId: params.modelId, + }); + if (params.timedOut && !params.isProbeSession) { + params.warn(`Profile ${params.lastProfileId} timed out. Trying next account...`); + } + if (params.cloudCodeAssistFormatError) { + params.warn( + `Profile ${params.lastProfileId} hit Cloud Code Assist format error. Tool calls will be sanitized on retry.`, + ); + } + } + + if (params.failoverReason === "overloaded") { + overloadProfileRotations += 1; + if ( + overloadProfileRotations > params.overloadProfileRotationLimit && + params.fallbackConfigured + ) { + const status = resolveFailoverStatus("overloaded"); + params.warn( + `overload profile rotation cap reached for ${sanitizeForLog(params.provider)}/${sanitizeForLog(params.modelId)} after ${overloadProfileRotations} rotations; escalating to model fallback`, + ); + params.logAssistantFailoverDecision("fallback_model", { status }); + return { + action: "throw", + overloadProfileRotations, + error: new FailoverError( + "The AI service is temporarily overloaded. Please try again in a moment.", + { + reason: "overloaded", + provider: params.activeErrorContext.provider, + model: params.activeErrorContext.model, + profileId: params.lastProfileId, + status, + }, + ), + }; + } + } + + if (params.failoverReason === "rate_limit") { + params.maybeEscalateRateLimitProfileFallback({ + failoverProvider: params.activeErrorContext.provider, + failoverModel: params.activeErrorContext.model, + logFallbackDecision: params.logAssistantFailoverDecision, + }); + } + + const rotated = await params.advanceAuthProfile(); + if (rotated) { + params.logAssistantFailoverDecision("rotate_profile"); + await params.maybeBackoffBeforeOverloadFailover(params.failoverReason); + return { + action: "retry", + overloadProfileRotations, + lastRetryFailoverReason: mergeRetryFailoverReason({ + previous: params.previousRetryFailoverReason, + failoverReason: params.failoverReason, + timedOut: params.timedOut, + }), + }; + } + + decision = resolveRunFailoverDecision({ + stage: "assistant", + aborted: params.aborted, + fallbackConfigured: params.fallbackConfigured, + failoverFailure: params.failoverFailure, + failoverReason: params.failoverReason, + timedOut: params.timedOut, + timedOutDuringCompaction: params.timedOutDuringCompaction, + profileRotated: true, + }); + } + + if (decision.action === "fallback_model") { + await params.maybeBackoffBeforeOverloadFailover(params.failoverReason); + const message = + (params.lastAssistant + ? formatAssistantErrorText(params.lastAssistant, { + cfg: params.config, + sessionKey: params.sessionKey, + provider: params.activeErrorContext.provider, + model: params.activeErrorContext.model, + }) + : undefined) || + params.lastAssistant?.errorMessage?.trim() || + (params.timedOut + ? "LLM request timed out." + : params.rateLimitFailure + ? "LLM request rate limited." + : params.billingFailure + ? formatBillingErrorMessage( + params.activeErrorContext.provider, + params.activeErrorContext.model, + ) + : params.authFailure + ? "LLM request unauthorized." + : "LLM request failed."); + const status = + resolveFailoverStatus(decision.reason) ?? (isTimeoutErrorMessage(message) ? 408 : undefined); + params.logAssistantFailoverDecision("fallback_model", { status }); + return { + action: "throw", + overloadProfileRotations, + error: new FailoverError(message, { + reason: decision.reason, + provider: params.activeErrorContext.provider, + model: params.activeErrorContext.model, + profileId: params.lastProfileId, + status, + }), + }; + } + + if (decision.action === "surface_error") { + params.logAssistantFailoverDecision("surface_error"); + } + + return { + action: "continue_normal", + overloadProfileRotations, + }; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 33e1f5e1d85..bf06cc0ca46 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -180,6 +180,7 @@ import { } from "./compaction-timeout.js"; import { pruneProcessedHistoryImages } from "./history-image-prune.js"; import { detectAndLoadPromptImages } from "./images.js"; +import { buildAttemptReplayMetadata } from "./incomplete-turn.js"; import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js"; import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js"; @@ -1934,6 +1935,11 @@ export async function runEmbeddedAttempt( } return { + replayMetadata: buildAttemptReplayMetadata({ + toolMetas: toolMetasNormalized, + didSendViaMessagingTool: didSendViaMessagingTool(), + successfulCronAdds: getSuccessfulCronAdds(), + }), aborted, timedOut, timedOutDuringCompaction, diff --git a/src/agents/pi-embedded-runner/run/failover-policy.test.ts b/src/agents/pi-embedded-runner/run/failover-policy.test.ts new file mode 100644 index 00000000000..47c4b13fd8e --- /dev/null +++ b/src/agents/pi-embedded-runner/run/failover-policy.test.ts @@ -0,0 +1,135 @@ +import { describe, expect, it } from "vitest"; +import { mergeRetryFailoverReason, resolveRunFailoverDecision } from "./failover-policy.js"; + +describe("resolveRunFailoverDecision", () => { + it("escalates retry-limit exhaustion for replay-safe failover reasons", () => { + expect( + resolveRunFailoverDecision({ + stage: "retry_limit", + fallbackConfigured: true, + failoverReason: "rate_limit", + }), + ).toEqual({ + action: "fallback_model", + reason: "rate_limit", + }); + }); + + it("keeps retry-limit as a local error for non-escalating reasons", () => { + expect( + resolveRunFailoverDecision({ + stage: "retry_limit", + fallbackConfigured: true, + failoverReason: "timeout", + }), + ).toEqual({ + action: "return_error_payload", + }); + }); + + it("prefers prompt-side profile rotation before fallback", () => { + expect( + resolveRunFailoverDecision({ + stage: "prompt", + aborted: false, + fallbackConfigured: true, + failoverFailure: true, + failoverReason: "rate_limit", + profileRotated: false, + }), + ).toEqual({ + action: "rotate_profile", + reason: "rate_limit", + }); + }); + + it("falls back after prompt rotation is exhausted", () => { + expect( + resolveRunFailoverDecision({ + stage: "prompt", + aborted: false, + fallbackConfigured: true, + failoverFailure: true, + failoverReason: "rate_limit", + profileRotated: true, + }), + ).toEqual({ + action: "fallback_model", + reason: "rate_limit", + }); + }); + + it("treats classified assistant-side 429s as rotation candidates even without error stopReason", () => { + expect( + resolveRunFailoverDecision({ + stage: "assistant", + aborted: false, + fallbackConfigured: true, + failoverFailure: false, + failoverReason: "rate_limit", + timedOut: false, + timedOutDuringCompaction: false, + profileRotated: false, + }), + ).toEqual({ + action: "rotate_profile", + reason: "rate_limit", + }); + }); + + it("falls back after assistant rotation is exhausted", () => { + expect( + resolveRunFailoverDecision({ + stage: "assistant", + aborted: false, + fallbackConfigured: true, + failoverFailure: false, + failoverReason: "rate_limit", + timedOut: false, + timedOutDuringCompaction: false, + profileRotated: true, + }), + ).toEqual({ + action: "fallback_model", + reason: "rate_limit", + }); + }); + + it("does nothing for assistant turns without failover signals", () => { + expect( + resolveRunFailoverDecision({ + stage: "assistant", + aborted: false, + fallbackConfigured: true, + failoverFailure: false, + failoverReason: null, + timedOut: false, + timedOutDuringCompaction: false, + profileRotated: false, + }), + ).toEqual({ + action: "continue_normal", + }); + }); +}); + +describe("mergeRetryFailoverReason", () => { + it("preserves the previous classified reason when the current one is null", () => { + expect( + mergeRetryFailoverReason({ + previous: "rate_limit", + failoverReason: null, + }), + ).toBe("rate_limit"); + }); + + it("records timeout when no classified reason is present", () => { + expect( + mergeRetryFailoverReason({ + previous: null, + failoverReason: null, + timedOut: true, + }), + ).toBe("timeout"); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/failover-policy.ts b/src/agents/pi-embedded-runner/run/failover-policy.ts new file mode 100644 index 00000000000..0b8c8233615 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/failover-policy.ts @@ -0,0 +1,163 @@ +import type { FailoverReason } from "../../pi-embedded-helpers.js"; + +export type RunFailoverDecisionAction = + | "continue_normal" + | "rotate_profile" + | "fallback_model" + | "surface_error" + | "return_error_payload"; + +export type RunFailoverDecision = + | { + action: "continue_normal"; + } + | { + action: "rotate_profile" | "surface_error"; + reason: FailoverReason | null; + } + | { + action: "fallback_model"; + reason: FailoverReason; + } + | { + action: "return_error_payload"; + }; + +export type RetryLimitFailoverDecision = Extract< + RunFailoverDecision, + { action: "fallback_model" | "return_error_payload" } +>; + +export type PromptFailoverDecision = Extract< + RunFailoverDecision, + { action: "rotate_profile" | "fallback_model" | "surface_error" } +>; + +export type AssistantFailoverDecision = Extract< + RunFailoverDecision, + { action: "continue_normal" | "rotate_profile" | "fallback_model" | "surface_error" } +>; + +type RetryLimitDecisionParams = { + stage: "retry_limit"; + fallbackConfigured: boolean; + failoverReason: FailoverReason | null; +}; + +type PromptDecisionParams = { + stage: "prompt"; + aborted: boolean; + fallbackConfigured: boolean; + failoverFailure: boolean; + failoverReason: FailoverReason | null; + profileRotated: boolean; +}; + +type AssistantDecisionParams = { + stage: "assistant"; + aborted: boolean; + fallbackConfigured: boolean; + failoverFailure: boolean; + failoverReason: FailoverReason | null; + timedOut: boolean; + timedOutDuringCompaction: boolean; + profileRotated: boolean; +}; + +export type RunFailoverDecisionParams = + | RetryLimitDecisionParams + | PromptDecisionParams + | AssistantDecisionParams; + +function shouldEscalateRetryLimit(reason: FailoverReason | null): boolean { + return Boolean( + reason && + reason !== "timeout" && + reason !== "model_not_found" && + reason !== "format" && + reason !== "session_expired", + ); +} + +function shouldRotatePrompt(params: PromptDecisionParams): boolean { + return params.failoverFailure && params.failoverReason !== "timeout"; +} + +function shouldRotateAssistant(params: AssistantDecisionParams): boolean { + return ( + (!params.aborted && (params.failoverFailure || params.failoverReason !== null)) || + (params.timedOut && !params.timedOutDuringCompaction) + ); +} + +export function mergeRetryFailoverReason(params: { + previous: FailoverReason | null; + failoverReason: FailoverReason | null; + timedOut?: boolean; +}): FailoverReason | null { + return params.failoverReason ?? (params.timedOut ? "timeout" : null) ?? params.previous; +} + +export function resolveRunFailoverDecision( + params: RetryLimitDecisionParams, +): RetryLimitFailoverDecision; +export function resolveRunFailoverDecision(params: PromptDecisionParams): PromptFailoverDecision; +export function resolveRunFailoverDecision( + params: AssistantDecisionParams, +): AssistantFailoverDecision; +export function resolveRunFailoverDecision(params: RunFailoverDecisionParams): RunFailoverDecision { + if (params.stage === "retry_limit") { + if (params.fallbackConfigured && shouldEscalateRetryLimit(params.failoverReason)) { + const fallbackReason = params.failoverReason ?? "unknown"; + return { + action: "fallback_model", + reason: fallbackReason, + }; + } + return { + action: "return_error_payload", + }; + } + + if (params.stage === "prompt") { + if (!params.profileRotated && shouldRotatePrompt(params)) { + return { + action: "rotate_profile", + reason: params.failoverReason, + }; + } + if (params.fallbackConfigured && params.failoverFailure) { + return { + action: "fallback_model", + reason: params.failoverReason ?? "unknown", + }; + } + return { + action: "surface_error", + reason: params.failoverReason, + }; + } + + const assistantShouldRotate = shouldRotateAssistant(params); + if (!params.profileRotated && assistantShouldRotate) { + return { + action: "rotate_profile", + reason: params.failoverReason, + }; + } + if (assistantShouldRotate && params.fallbackConfigured) { + return { + action: "fallback_model", + reason: params.timedOut ? "timeout" : (params.failoverReason ?? "unknown"), + }; + } + if (!assistantShouldRotate) { + return { + action: "continue_normal", + }; + } + return { + action: "surface_error", + reason: params.failoverReason, + }; +} diff --git a/src/agents/pi-embedded-runner/run/incomplete-turn.ts b/src/agents/pi-embedded-runner/run/incomplete-turn.ts new file mode 100644 index 00000000000..25ea8aa1c37 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/incomplete-turn.ts @@ -0,0 +1,57 @@ +import { isLikelyMutatingToolName } from "../../tool-mutation.js"; +import type { EmbeddedRunAttemptResult } from "./types.js"; + +type ReplayMetadataAttempt = Pick< + EmbeddedRunAttemptResult, + "toolMetas" | "didSendViaMessagingTool" | "successfulCronAdds" +>; + +type IncompleteTurnAttempt = Pick< + EmbeddedRunAttemptResult, + | "clientToolCall" + | "yieldDetected" + | "didSendDeterministicApprovalPrompt" + | "lastToolError" + | "lastAssistant" + | "replayMetadata" +>; + +export function buildAttemptReplayMetadata( + params: ReplayMetadataAttempt, +): EmbeddedRunAttemptResult["replayMetadata"] { + const hadMutatingTools = params.toolMetas.some((t) => isLikelyMutatingToolName(t.toolName)); + const hadPotentialSideEffects = + hadMutatingTools || params.didSendViaMessagingTool || (params.successfulCronAdds ?? 0) > 0; + return { + hadPotentialSideEffects, + replaySafe: !hadPotentialSideEffects, + }; +} + +export function resolveIncompleteTurnPayloadText(params: { + payloadCount: number; + aborted: boolean; + timedOut: boolean; + attempt: IncompleteTurnAttempt; +}): string | null { + if ( + params.payloadCount !== 0 || + params.aborted || + params.timedOut || + params.attempt.clientToolCall || + params.attempt.yieldDetected || + params.attempt.didSendDeterministicApprovalPrompt || + params.attempt.lastToolError + ) { + return null; + } + + const stopReason = params.attempt.lastAssistant?.stopReason; + if (stopReason !== "toolUse" && stopReason !== "error") { + return null; + } + + return params.attempt.replayMetadata.hadPotentialSideEffects + ? "⚠️ Agent couldn't generate a response. Note: some tool actions may have already been executed — please verify before retrying." + : "⚠️ Agent couldn't generate a response. Please try again."; +} diff --git a/src/agents/pi-embedded-runner/run/retry-limit.ts b/src/agents/pi-embedded-runner/run/retry-limit.ts new file mode 100644 index 00000000000..2fbdc23c51d --- /dev/null +++ b/src/agents/pi-embedded-runner/run/retry-limit.ts @@ -0,0 +1,39 @@ +import { FailoverError, resolveFailoverStatus } from "../../failover-error.js"; +import type { EmbeddedPiAgentMeta, EmbeddedPiRunResult } from "../types.js"; +import type { RetryLimitFailoverDecision } from "./failover-policy.js"; + +export function handleRetryLimitExhaustion(params: { + message: string; + decision: RetryLimitFailoverDecision; + provider: string; + model: string; + profileId?: string; + durationMs: number; + agentMeta: EmbeddedPiAgentMeta; +}): EmbeddedPiRunResult { + if (params.decision.action === "fallback_model") { + throw new FailoverError(params.message, { + reason: params.decision.reason, + provider: params.provider, + model: params.model, + profileId: params.profileId, + status: resolveFailoverStatus(params.decision.reason), + }); + } + + return { + payloads: [ + { + text: + "Request failed after repeated internal retries. " + + "Please try again, or use /new to start a fresh session.", + isError: true, + }, + ], + meta: { + durationMs: params.durationMs, + agentMeta: params.agentMeta, + error: { kind: "retry_limit", message: params.message }, + }, + }; +} diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 89ec7161657..1664d8442d5 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -61,4 +61,8 @@ export type EmbeddedRunAttemptResult = { clientToolCall?: { name: string; params: Record }; /** True when sessions_yield tool was called during this attempt. */ yieldDetected?: boolean; + replayMetadata: { + hadPotentialSideEffects: boolean; + replaySafe: boolean; + }; }; diff --git a/src/agents/pi-embedded-runner/usage-reporting.test.ts b/src/agents/pi-embedded-runner/usage-reporting.test.ts index 88c10107e6b..dd165b81b1a 100644 --- a/src/agents/pi-embedded-runner/usage-reporting.test.ts +++ b/src/agents/pi-embedded-runner/usage-reporting.test.ts @@ -5,6 +5,7 @@ import { mockedEnsureRuntimePluginsLoaded, mockedRunEmbeddedAttempt, } from "./run.overflow-compaction.harness.js"; +import { buildAttemptReplayMetadata } from "./run/incomplete-turn.js"; import type { EmbeddedRunAttemptResult } from "./run/types.js"; let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent; @@ -12,6 +13,9 @@ let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent; function makeAttemptResult( overrides: Partial = {}, ): EmbeddedRunAttemptResult { + const toolMetas = overrides.toolMetas ?? []; + const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false; + const successfulCronAdds = overrides.successfulCronAdds; return { aborted: false, timedOut: false, @@ -20,9 +24,16 @@ function makeAttemptResult( sessionIdUsed: "test-session", messagesSnapshot: [], assistantTexts: [], - toolMetas: [], + toolMetas, lastAssistant: undefined, - didSendViaMessagingTool: false, + replayMetadata: + overrides.replayMetadata ?? + buildAttemptReplayMetadata({ + toolMetas, + didSendViaMessagingTool, + successfulCronAdds, + }), + didSendViaMessagingTool, messagingToolSentTexts: [], messagingToolSentMediaUrls: [], messagingToolSentTargets: [], diff --git a/src/agents/test-helpers/pi-embedded-runner-e2e-fixtures.ts b/src/agents/test-helpers/pi-embedded-runner-e2e-fixtures.ts index ac6fec7691e..843b0e75661 100644 --- a/src/agents/test-helpers/pi-embedded-runner-e2e-fixtures.ts +++ b/src/agents/test-helpers/pi-embedded-runner-e2e-fixtures.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import type { AssistantMessage } from "@mariozechner/pi-ai"; import type { OpenClawConfig } from "../../config/config.js"; +import { buildAttemptReplayMetadata } from "../pi-embedded-runner/run/incomplete-turn.js"; import type { EmbeddedRunAttemptResult } from "../pi-embedded-runner/run/types.js"; export type EmbeddedPiRunnerTestWorkspace = { @@ -96,6 +97,9 @@ export function buildEmbeddedRunnerAssistant( export function makeEmbeddedRunnerAttempt( overrides: Partial, ): EmbeddedRunAttemptResult { + const toolMetas = overrides.toolMetas ?? []; + const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false; + const successfulCronAdds = overrides.successfulCronAdds; return { aborted: false, timedOut: false, @@ -105,9 +109,16 @@ export function makeEmbeddedRunnerAttempt( systemPromptReport: undefined, messagesSnapshot: [], assistantTexts: [], - toolMetas: [], + toolMetas, lastAssistant: undefined, - didSendViaMessagingTool: false, + replayMetadata: + overrides.replayMetadata ?? + buildAttemptReplayMetadata({ + toolMetas, + didSendViaMessagingTool, + successfulCronAdds, + }), + didSendViaMessagingTool, messagingToolSentTexts: [], messagingToolSentMediaUrls: [], messagingToolSentTargets: [],