mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-29 02:41:07 +00:00
refactor: split embedded run helpers
This commit is contained in:
@@ -5,8 +5,7 @@ import {
|
||||
ensureContextEnginesInitialized,
|
||||
resolveContextEngine,
|
||||
} from "../../context-engine/index.js";
|
||||
import { type BackoffPolicy, computeBackoff, sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { generateSecureToken } from "../../infra/secure-random.js";
|
||||
import { computeBackoff, sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { prepareProviderRuntimeAuth } from "../../plugins/provider-runtime.js";
|
||||
import type { PluginHookBeforeAgentStartResult } from "../../plugins/types.js";
|
||||
@@ -76,6 +75,19 @@ import { log } from "./logger.js";
|
||||
import { resolveModelAsync } from "./model.js";
|
||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||
import { createFailoverDecisionLogger } from "./run/failover-observation.js";
|
||||
import {
|
||||
buildErrorAgentMeta,
|
||||
buildUsageAgentMetaFields,
|
||||
createCompactionDiagId,
|
||||
OVERLOAD_FAILOVER_BACKOFF_POLICY,
|
||||
resolveActiveErrorContext,
|
||||
resolveMaxRunRetryIterations,
|
||||
RUNTIME_AUTH_REFRESH_MARGIN_MS,
|
||||
RUNTIME_AUTH_REFRESH_MIN_DELAY_MS,
|
||||
RUNTIME_AUTH_REFRESH_RETRY_MS,
|
||||
type RuntimeAuthState,
|
||||
scrubAnthropicRefusalMagic,
|
||||
} from "./run/helpers.js";
|
||||
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
|
||||
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
|
||||
import {
|
||||
@@ -83,134 +95,11 @@ import {
|
||||
truncateOversizedToolResultsInSession,
|
||||
} from "./tool-result-truncation.js";
|
||||
import type { EmbeddedPiAgentMeta, EmbeddedPiRunResult } from "./types.js";
|
||||
import {
|
||||
createUsageAccumulator,
|
||||
mergeUsageIntoAccumulator,
|
||||
resolveLastCallUsage,
|
||||
toNormalizedUsage,
|
||||
type UsageAccumulator,
|
||||
} from "./usage-accumulator.js";
|
||||
import { createUsageAccumulator, mergeUsageIntoAccumulator } from "./usage-accumulator.js";
|
||||
import { describeUnknownError } from "./utils.js";
|
||||
|
||||
type ApiKeyInfo = ResolvedProviderAuth;
|
||||
|
||||
type RuntimeAuthState = {
|
||||
sourceApiKey: string;
|
||||
authMode: string;
|
||||
profileId?: string;
|
||||
expiresAt?: number;
|
||||
refreshTimer?: ReturnType<typeof setTimeout>;
|
||||
refreshInFlight?: Promise<void>;
|
||||
};
|
||||
|
||||
const RUNTIME_AUTH_REFRESH_MARGIN_MS = 5 * 60 * 1000;
|
||||
const RUNTIME_AUTH_REFRESH_RETRY_MS = 60 * 1000;
|
||||
const RUNTIME_AUTH_REFRESH_MIN_DELAY_MS = 5 * 1000;
|
||||
// Keep overload pacing noticeable enough to avoid tight retry bursts, but short
|
||||
// enough that fallback still feels responsive within a single turn.
|
||||
const OVERLOAD_FAILOVER_BACKOFF_POLICY: BackoffPolicy = {
|
||||
initialMs: 250,
|
||||
maxMs: 1_500,
|
||||
factor: 2,
|
||||
jitter: 0.2,
|
||||
};
|
||||
|
||||
// Avoid Anthropic's refusal test token poisoning session transcripts.
|
||||
const ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL = "ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL";
|
||||
const ANTHROPIC_MAGIC_STRING_REPLACEMENT = "ANTHROPIC MAGIC STRING TRIGGER REFUSAL (redacted)";
|
||||
|
||||
function scrubAnthropicRefusalMagic(prompt: string): string {
|
||||
if (!prompt.includes(ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL)) {
|
||||
return prompt;
|
||||
}
|
||||
return prompt.replaceAll(
|
||||
ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL,
|
||||
ANTHROPIC_MAGIC_STRING_REPLACEMENT,
|
||||
);
|
||||
}
|
||||
function createCompactionDiagId(): string {
|
||||
return `ovf-${Date.now().toString(36)}-${generateSecureToken(4)}`;
|
||||
}
|
||||
|
||||
// Defensive guard for the outer run loop across all retry branches.
|
||||
const BASE_RUN_RETRY_ITERATIONS = 24;
|
||||
const RUN_RETRY_ITERATIONS_PER_PROFILE = 8;
|
||||
const MIN_RUN_RETRY_ITERATIONS = 32;
|
||||
const MAX_RUN_RETRY_ITERATIONS = 160;
|
||||
|
||||
function resolveMaxRunRetryIterations(profileCandidateCount: number): number {
|
||||
const scaled =
|
||||
BASE_RUN_RETRY_ITERATIONS +
|
||||
Math.max(1, profileCandidateCount) * RUN_RETRY_ITERATIONS_PER_PROFILE;
|
||||
return Math.min(MAX_RUN_RETRY_ITERATIONS, Math.max(MIN_RUN_RETRY_ITERATIONS, scaled));
|
||||
}
|
||||
|
||||
function resolveActiveErrorContext(params: {
|
||||
lastAssistant: { provider?: string; model?: string } | undefined;
|
||||
provider: string;
|
||||
model: string;
|
||||
}): { provider: string; model: string } {
|
||||
return {
|
||||
provider: params.lastAssistant?.provider ?? params.provider,
|
||||
model: params.lastAssistant?.model ?? params.model,
|
||||
};
|
||||
}
|
||||
|
||||
function buildUsageAgentMetaFields(params: {
|
||||
usageAccumulator: UsageAccumulator;
|
||||
lastAssistantUsage?: UsageLike | null;
|
||||
lastRunPromptUsage: ReturnType<typeof normalizeUsage> | undefined;
|
||||
/** API-reported total from the most recent call, mirroring the success path correction. */
|
||||
lastTurnTotal?: number;
|
||||
}): Pick<EmbeddedPiAgentMeta, "usage" | "lastCallUsage" | "promptTokens"> {
|
||||
const usage = toNormalizedUsage(params.usageAccumulator);
|
||||
// Keep `usage.total` aligned with the API-reported latest-call total when
|
||||
// available; accumulated totals are for billing, not context display.
|
||||
if (usage && params.lastTurnTotal && params.lastTurnTotal > 0) {
|
||||
usage.total = params.lastTurnTotal;
|
||||
}
|
||||
const lastCallUsage = resolveLastCallUsage(params.lastAssistantUsage, params.usageAccumulator);
|
||||
const promptTokens = derivePromptTokens(params.lastRunPromptUsage);
|
||||
return {
|
||||
usage,
|
||||
lastCallUsage,
|
||||
promptTokens,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build agentMeta for error return paths, preserving accumulated usage so that
|
||||
* session totalTokens reflects the actual context size rather than going stale.
|
||||
* Without this, error returns omit usage and the session keeps whatever
|
||||
* totalTokens was set by the previous successful run.
|
||||
*/
|
||||
function buildErrorAgentMeta(params: {
|
||||
sessionId: string;
|
||||
provider: string;
|
||||
model: string;
|
||||
usageAccumulator: UsageAccumulator;
|
||||
lastRunPromptUsage: ReturnType<typeof normalizeUsage> | undefined;
|
||||
lastAssistant?: { usage?: unknown } | null;
|
||||
/** API-reported total from the most recent call, mirroring the success path correction. */
|
||||
lastTurnTotal?: number;
|
||||
}): EmbeddedPiAgentMeta {
|
||||
const usageMeta = buildUsageAgentMetaFields({
|
||||
usageAccumulator: params.usageAccumulator,
|
||||
lastAssistantUsage: params.lastAssistant?.usage as UsageLike | undefined,
|
||||
lastRunPromptUsage: params.lastRunPromptUsage,
|
||||
lastTurnTotal: params.lastTurnTotal,
|
||||
});
|
||||
return {
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
// Only include usage fields when we have actual data from prior API calls.
|
||||
...(usageMeta.usage ? { usage: usageMeta.usage } : {}),
|
||||
...(usageMeta.lastCallUsage ? { lastCallUsage: usageMeta.lastCallUsage } : {}),
|
||||
...(usageMeta.promptTokens ? { promptTokens: usageMeta.promptTokens } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export async function runEmbeddedPiAgent(
|
||||
params: RunEmbeddedPiAgentParams,
|
||||
): Promise<EmbeddedPiRunResult> {
|
||||
|
||||
128
src/agents/pi-embedded-runner/run/helpers.ts
Normal file
128
src/agents/pi-embedded-runner/run/helpers.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
import { type BackoffPolicy } from "../../../infra/backoff.js";
|
||||
import { generateSecureToken } from "../../../infra/secure-random.js";
|
||||
import { derivePromptTokens, normalizeUsage } from "../../usage.js";
|
||||
import type { EmbeddedPiAgentMeta } from "../types.js";
|
||||
import { toLastCallUsage, toNormalizedUsage, type UsageAccumulator } from "../usage-accumulator.js";
|
||||
|
||||
type UsageSnapshot = {
|
||||
input?: number;
|
||||
output?: number;
|
||||
cacheRead?: number;
|
||||
cacheWrite?: number;
|
||||
total?: number;
|
||||
};
|
||||
|
||||
export type RuntimeAuthState = {
|
||||
sourceApiKey: string;
|
||||
authMode: string;
|
||||
profileId?: string;
|
||||
expiresAt?: number;
|
||||
refreshTimer?: ReturnType<typeof setTimeout>;
|
||||
refreshInFlight?: Promise<void>;
|
||||
};
|
||||
|
||||
export const RUNTIME_AUTH_REFRESH_MARGIN_MS = 5 * 60 * 1000;
|
||||
export const RUNTIME_AUTH_REFRESH_RETRY_MS = 60 * 1000;
|
||||
export const RUNTIME_AUTH_REFRESH_MIN_DELAY_MS = 5 * 1000;
|
||||
|
||||
// Keep overload pacing noticeable enough to avoid tight retry bursts, but short
|
||||
// enough that fallback still feels responsive within a single turn.
|
||||
export const OVERLOAD_FAILOVER_BACKOFF_POLICY: BackoffPolicy = {
|
||||
initialMs: 250,
|
||||
maxMs: 1_500,
|
||||
factor: 2,
|
||||
jitter: 0.2,
|
||||
};
|
||||
|
||||
const ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL = "ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL";
|
||||
const ANTHROPIC_MAGIC_STRING_REPLACEMENT = "ANTHROPIC MAGIC STRING TRIGGER REFUSAL (redacted)";
|
||||
|
||||
// Avoid Anthropic's refusal test token poisoning session transcripts.
|
||||
export function scrubAnthropicRefusalMagic(prompt: string): string {
|
||||
if (!prompt.includes(ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL)) {
|
||||
return prompt;
|
||||
}
|
||||
return prompt.replaceAll(
|
||||
ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL,
|
||||
ANTHROPIC_MAGIC_STRING_REPLACEMENT,
|
||||
);
|
||||
}
|
||||
|
||||
export function createCompactionDiagId(): string {
|
||||
return `ovf-${Date.now().toString(36)}-${generateSecureToken(4)}`;
|
||||
}
|
||||
|
||||
const BASE_RUN_RETRY_ITERATIONS = 24;
|
||||
const RUN_RETRY_ITERATIONS_PER_PROFILE = 8;
|
||||
const MIN_RUN_RETRY_ITERATIONS = 32;
|
||||
const MAX_RUN_RETRY_ITERATIONS = 160;
|
||||
|
||||
// Defensive guard for the outer run loop across all retry branches.
|
||||
export function resolveMaxRunRetryIterations(profileCandidateCount: number): number {
|
||||
const scaled =
|
||||
BASE_RUN_RETRY_ITERATIONS +
|
||||
Math.max(1, profileCandidateCount) * RUN_RETRY_ITERATIONS_PER_PROFILE;
|
||||
return Math.min(MAX_RUN_RETRY_ITERATIONS, Math.max(MIN_RUN_RETRY_ITERATIONS, scaled));
|
||||
}
|
||||
|
||||
export function resolveActiveErrorContext(params: {
|
||||
lastAssistant: { provider?: string; model?: string } | undefined;
|
||||
provider: string;
|
||||
model: string;
|
||||
}): { provider: string; model: string } {
|
||||
return {
|
||||
provider: params.lastAssistant?.provider ?? params.provider,
|
||||
model: params.lastAssistant?.model ?? params.model,
|
||||
};
|
||||
}
|
||||
|
||||
export function buildUsageAgentMetaFields(params: {
|
||||
usageAccumulator: UsageAccumulator;
|
||||
lastAssistantUsage?: UsageSnapshot | null;
|
||||
lastRunPromptUsage: UsageSnapshot | undefined;
|
||||
lastTurnTotal?: number;
|
||||
}): Pick<EmbeddedPiAgentMeta, "usage" | "lastCallUsage" | "promptTokens"> {
|
||||
const usage = toNormalizedUsage(params.usageAccumulator);
|
||||
if (usage && params.lastTurnTotal && params.lastTurnTotal > 0) {
|
||||
usage.total = params.lastTurnTotal;
|
||||
}
|
||||
const lastCallUsage =
|
||||
normalizeUsage(params.lastAssistantUsage as never) ?? toLastCallUsage(params.usageAccumulator);
|
||||
const promptTokens = derivePromptTokens(params.lastRunPromptUsage);
|
||||
return {
|
||||
usage,
|
||||
lastCallUsage,
|
||||
promptTokens,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build agentMeta for error return paths, preserving accumulated usage so that
|
||||
* session totalTokens reflects the actual context size rather than going stale.
|
||||
* Without this, error returns omit usage and the session keeps whatever
|
||||
* totalTokens was set by the previous successful run.
|
||||
*/
|
||||
export function buildErrorAgentMeta(params: {
|
||||
sessionId: string;
|
||||
provider: string;
|
||||
model: string;
|
||||
usageAccumulator: UsageAccumulator;
|
||||
lastRunPromptUsage: UsageSnapshot | undefined;
|
||||
lastAssistant?: { usage?: unknown } | null;
|
||||
lastTurnTotal?: number;
|
||||
}): EmbeddedPiAgentMeta {
|
||||
const usageMeta = buildUsageAgentMetaFields({
|
||||
usageAccumulator: params.usageAccumulator,
|
||||
lastAssistantUsage: params.lastAssistant?.usage as UsageSnapshot | undefined,
|
||||
lastRunPromptUsage: params.lastRunPromptUsage,
|
||||
lastTurnTotal: params.lastTurnTotal,
|
||||
});
|
||||
return {
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
...(usageMeta.usage ? { usage: usageMeta.usage } : {}),
|
||||
...(usageMeta.lastCallUsage ? { lastCallUsage: usageMeta.lastCallUsage } : {}),
|
||||
...(usageMeta.promptTokens ? { promptTokens: usageMeta.promptTokens } : {}),
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user