Files
openclaw/extensions/amazon-bedrock/register.sync.runtime.ts
Rob Riggs 0ef1f36286 feat(bedrock): add service_tier parameter support
- Add resolveBedrockServiceTier() and createBedrockServiceTierWrapper()
  to bedrock-stream-wrappers.ts
- Export service tier functions from provider-stream-shared.ts SDK barrel
- Wire service tier into Bedrock provider wrapStreamFn
- Accepts serviceTier or service_tier via agents.defaults.params

Valid values: default, flex, priority, reserved

Authored by Deepseek-v4-Pro, reviewed by rob@mobilinkd.com.
2026-05-08 13:10:43 +05:30

669 lines
24 KiB
TypeScript

import type { StreamFn } from "@mariozechner/pi-agent-core";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { resolvePluginConfigObject } from "openclaw/plugin-sdk/plugin-config-runtime";
import type { OpenClawPluginApi, ProviderThinkingProfile } from "openclaw/plugin-sdk/plugin-entry";
import {
ANTHROPIC_BY_MODEL_REPLAY_HOOKS,
normalizeProviderId,
} from "openclaw/plugin-sdk/provider-model-shared";
import {
createBedrockNoCacheWrapper,
isAnthropicBedrockModel,
streamWithPayloadPatch,
} from "openclaw/plugin-sdk/provider-stream-shared";
import { refreshAwsSharedConfigCacheForBedrock } from "./aws-credential-refresh.js";
import { mergeImplicitBedrockProvider, resolveBedrockConfigApiKey } from "./discovery-shared.js";
import { bedrockMemoryEmbeddingProviderAdapter } from "./memory-embedding-adapter.js";
type GuardrailConfig = {
guardrailIdentifier: string;
guardrailVersion: string;
streamProcessingMode?: "sync" | "async";
trace?: "enabled" | "disabled" | "enabled_full";
};
type AmazonBedrockPluginConfig = {
discovery?: {
enabled?: boolean;
region?: string;
providerFilter?: string[];
refreshInterval?: number;
defaultContextWindow?: number;
defaultMaxTokens?: number;
};
guardrail?: GuardrailConfig;
};
const BEDROCK_SERVICE_TIER_VALUES = ["flex", "priority", "default", "reserved"] as const;
type BedrockServiceTier = (typeof BEDROCK_SERVICE_TIER_VALUES)[number];
function isBedrockServiceTier(value: string): value is BedrockServiceTier {
return BEDROCK_SERVICE_TIER_VALUES.some((tier) => tier === value);
}
function resolveBedrockServiceTier(
extraParams: Record<string, unknown> | undefined,
warn: (message: string) => void,
): BedrockServiceTier | undefined {
const raw = extraParams?.serviceTier ?? extraParams?.service_tier;
if (typeof raw !== "string") {
return undefined;
}
const normalized = raw.trim().toLowerCase();
if (isBedrockServiceTier(normalized)) {
return normalized;
}
warn(`ignoring invalid Bedrock service_tier param: ${raw}`);
return undefined;
}
function createBedrockServiceTierWrapper(
underlying: StreamFn,
serviceTier: BedrockServiceTier,
): StreamFn {
return (model, context, options) => {
if (model.api !== "bedrock-converse-stream") {
return underlying(model, context, options);
}
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
payloadObj.serviceTier ??= { type: serviceTier };
});
};
}
function createGuardrailWrapStreamFn(
innerWrapStreamFn: (ctx: { modelId: string; streamFn?: StreamFn }) => StreamFn | null | undefined,
guardrailConfig: GuardrailConfig,
): (ctx: { modelId: string; streamFn?: StreamFn }) => StreamFn | null | undefined {
return (ctx) => {
const inner = innerWrapStreamFn(ctx);
if (!inner) {
return inner;
}
return (model, context, options) => {
return streamWithPayloadPatch(inner, model, context, options, (payload) => {
const gc: Record<string, unknown> = {
guardrailIdentifier: guardrailConfig.guardrailIdentifier,
guardrailVersion: guardrailConfig.guardrailVersion,
};
if (guardrailConfig.streamProcessingMode) {
gc.streamProcessingMode = guardrailConfig.streamProcessingMode;
}
if (guardrailConfig.trace) {
gc.trace = guardrailConfig.trace;
}
payload.guardrailConfig = gc;
});
};
};
}
/**
* Mirrors the shipped pi-ai Bedrock `supportsPromptCaching` matcher.
* Keep this in sync with node_modules/@mariozechner/pi-ai/dist/providers/amazon-bedrock.js.
*/
function matchesPiAiPromptCachingModelId(modelId: string): boolean {
const id = modelId.toLowerCase();
if (!id.includes("claude")) {
return false;
}
// Claude 4.x
if (id.includes("-4-") || id.includes("-4.")) {
return true;
}
// Claude 3.7 Sonnet
if (id.includes("claude-3-7-sonnet")) {
return true;
}
// Claude 3.5 Haiku
if (id.includes("claude-3-5-haiku")) {
return true;
}
return false;
}
function piAiWouldInjectCachePoints(modelId: string): boolean {
return matchesPiAiPromptCachingModelId(modelId);
}
/**
* Detect Bedrock application inference profile ARNs — these are the only IDs
* where pi-ai's model-name-based checks fail because the ARN is opaque.
* System-defined profiles (us., eu., global.) and base model IDs always
* contain the model name and are handled by pi-ai natively.
*/
const BEDROCK_APP_INFERENCE_PROFILE_RE =
/^arn:aws(-cn|-us-gov)?:bedrock:.*:application-inference-profile\//i;
function isBedrockAppInferenceProfile(modelId: string): boolean {
return BEDROCK_APP_INFERENCE_PROFILE_RE.test(modelId);
}
/**
* pi-ai's internal `supportsPromptCaching` checks `model.id` for specific Claude
* model name patterns, which fails for application inference profile ARNs (opaque
* IDs that may not contain the model name). When OpenClaw's `isAnthropicBedrockModel`
* identifies the model but pi-ai won't inject cache points, we do it via onPayload.
*
* Gated to application inference profile ARNs only — regular Claude model IDs and
* system-defined inference profiles (us.anthropic.claude-*) are left to pi-ai.
*/
function needsCachePointInjection(modelId: string): boolean {
// Only target application inference profile ARNs.
if (!isBedrockAppInferenceProfile(modelId)) {
return false;
}
// If pi-ai would already inject cache points, skip.
if (piAiWouldInjectCachePoints(modelId)) {
return false;
}
// Check if OpenClaw identifies this as an Anthropic model via the ARN heuristic.
if (isAnthropicBedrockModel(modelId)) {
return true;
}
return false;
}
/**
* Extract the region from a Bedrock ARN.
* e.g. "arn:aws:bedrock:us-east-1:123:application-inference-profile/abc" → "us-east-1"
*/
function extractRegionFromArn(arn: string): string | undefined {
const parts = arn.split(":");
// ARN format: arn:partition:service:region:account:resource
return parts.length >= 4 && parts[3] ? parts[3] : undefined;
}
/**
* Check if a resolved foundation model ARN supports prompt caching using the
* same matcher pi-ai uses for direct model IDs.
*/
function resolvedModelSupportsCaching(modelArn: string): boolean {
return matchesPiAiPromptCachingModelId(modelArn);
}
function isOpus47BedrockModelRef(modelRef: string): boolean {
return /(?:^|[/.:])(?:(?:us|eu|ap|apac|au|jp|global)\.)?anthropic\.claude-opus-4[.-]7(?:$|[-.:/])/i.test(
modelRef,
);
}
/**
* Resolve the underlying foundation model for an application inference profile
* via GetInferenceProfile. Results are cached so we only call the API once per
* profile ARN. Returns traits needed for request shaping when the model id is
* otherwise opaque.
*
* Region is extracted from the profile ARN itself to avoid mismatches when
* the OpenClaw config region differs from the profile's home region.
*/
type BedrockAppProfileTraits = {
cacheEligible: boolean;
omitTemperature: boolean;
};
const appProfileTraitsCache = new Map<string, BedrockAppProfileTraits>();
type BedrockGetInferenceProfileResponse = {
models?: Array<{ modelArn?: string }>;
};
type BedrockControlPlane = {
getInferenceProfile: (input: {
inferenceProfileIdentifier: string;
}) => Promise<BedrockGetInferenceProfileResponse>;
};
type BedrockControlPlaneFactory = (region: string | undefined) => BedrockControlPlane;
let bedrockControlPlaneOverride: BedrockControlPlaneFactory | undefined;
export function resetBedrockAppProfileCacheEligibilityForTest(): void {
appProfileTraitsCache.clear();
}
export function setBedrockAppProfileControlPlaneForTest(
controlPlane: BedrockControlPlaneFactory | undefined,
): void {
bedrockControlPlaneOverride = controlPlane;
resetBedrockAppProfileCacheEligibilityForTest();
}
async function createBedrockControlPlane(region: string | undefined): Promise<BedrockControlPlane> {
if (bedrockControlPlaneOverride) {
return bedrockControlPlaneOverride(region);
}
await refreshAwsSharedConfigCacheForBedrock();
const { BedrockClient, GetInferenceProfileCommand } = await import("@aws-sdk/client-bedrock");
const client = new BedrockClient(region ? { region } : {});
return {
getInferenceProfile: async (input) => await client.send(new GetInferenceProfileCommand(input)),
};
}
async function resolveAppProfileTraits(
modelId: string,
fallbackRegion: string | undefined,
): Promise<BedrockAppProfileTraits> {
const cached = appProfileTraitsCache.get(modelId);
if (cached) {
return cached;
}
try {
const region = extractRegionFromArn(modelId) ?? fallbackRegion;
const controlPlane = await createBedrockControlPlane(region);
const resp = await controlPlane.getInferenceProfile({ inferenceProfileIdentifier: modelId });
const models = resp.models ?? [];
const modelArns = models.map((m: { modelArn?: string }) => m.modelArn ?? "");
const traits = {
cacheEligible:
models.length > 0 && modelArns.every((modelArn) => resolvedModelSupportsCaching(modelArn)),
omitTemperature: modelArns.some(isOpus47BedrockModelRef),
};
appProfileTraitsCache.set(modelId, traits);
return traits;
} catch {
// Transient failures (throttling, network, IAM) should not be cached —
// return the heuristic fallback but allow retry on the next request.
return {
cacheEligible: isAnthropicBedrockModel(modelId),
omitTemperature: isOpus47BedrockModelRef(modelId),
};
}
}
type BedrockCachePoint = { cachePoint: { type: "default"; ttl?: string } };
type BedrockContentBlock = Record<string, unknown>;
type BedrockMessage = { role?: string; content?: BedrockContentBlock[] };
function hasCachePoint(blocks: BedrockContentBlock[] | undefined): boolean {
return blocks?.some((b) => b.cachePoint != null) === true;
}
function makeCachePoint(cacheRetention: string | undefined): BedrockCachePoint {
return {
cachePoint: {
type: "default",
...(cacheRetention === "long" ? { ttl: "1h" } : {}),
},
};
}
/**
* Inject Bedrock Converse cache points into the payload when pi-ai skipped them
* because it didn't recognize the model ID (application inference profiles).
*/
function injectBedrockCachePoints(
payload: Record<string, unknown>,
cacheRetention: string | undefined,
): void {
if (!cacheRetention || cacheRetention === "none") {
return;
}
const point = makeCachePoint(cacheRetention);
// Inject into system prompt if missing.
const system = payload.system as BedrockContentBlock[] | undefined;
if (Array.isArray(system) && system.length > 0 && !hasCachePoint(system)) {
system.push(point);
}
// Inject into the last user message if missing.
// Bedrock Converse uses lowercase roles ("user" / "assistant").
const messages = payload.messages as BedrockMessage[] | undefined;
if (Array.isArray(messages) && messages.length > 0) {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === "user" && Array.isArray(msg.content)) {
if (!hasCachePoint(msg.content)) {
msg.content.push(point);
}
break;
}
}
}
}
function patchOpus47MaxThinkingEffort(payload: Record<string, unknown>): void {
const fieldsValue = payload.additionalModelRequestFields;
const fields =
fieldsValue && typeof fieldsValue === "object" && !Array.isArray(fieldsValue)
? (fieldsValue as Record<string, unknown>)
: {};
const outputConfigValue = fields.output_config;
const outputConfig =
outputConfigValue && typeof outputConfigValue === "object" && !Array.isArray(outputConfigValue)
? (outputConfigValue as Record<string, unknown>)
: {};
outputConfig.effort = "max";
fields.output_config = outputConfig;
payload.additionalModelRequestFields = fields;
}
export function registerAmazonBedrockPlugin(api: OpenClawPluginApi): void {
// Keep registration-local constants inside the function so partial module
// initialization during test bootstrap cannot trip TDZ reads.
const providerId = "amazon-bedrock";
const claude46ModelRe = /claude-(?:opus|sonnet)-4(?:\.|-)6(?:$|[-.])/i;
const baseClaudeThinkingLevels = [
{ id: "off" },
{ id: "minimal" },
{ id: "low" },
{ id: "medium" },
{ id: "high" },
] as const satisfies ProviderThinkingProfile["levels"];
// Match region from bedrock-runtime (Converse API) URLs.
// e.g. https://bedrock-runtime.us-east-1.amazonaws.com
const bedrockRegionRe = /bedrock-runtime\.([a-z0-9-]+)\.amazonaws\./;
const bedrockContextOverflowPatterns = [
/ValidationException.*(?:input is too long|max input token|input token.*exceed)/i,
/ValidationException.*(?:exceeds? the (?:maximum|max) (?:number of )?(?:input )?tokens)/i,
/ModelStreamErrorException.*(?:Input is too long|too many input tokens)/i,
] as const;
const deprecatedTemperatureValidationRe =
/ValidationException[\s\S]*(?:invalid_request_error[\s\S]*)?temperature[\s\S]*deprecated|ValidationException[\s\S]*deprecated[\s\S]*temperature/i;
const anthropicByModelReplayHooks = ANTHROPIC_BY_MODEL_REPLAY_HOOKS;
const startupPluginConfig = (api.pluginConfig ?? {}) as AmazonBedrockPluginConfig;
function resolveBedrockClaudeThinkingProfile(modelId: string): ProviderThinkingProfile {
const trimmed = modelId.trim();
if (isOpus47BedrockModelRef(trimmed)) {
return {
levels: [...baseClaudeThinkingLevels, { id: "xhigh" }, { id: "adaptive" }, { id: "max" }],
defaultLevel: "off",
};
}
if (claude46ModelRe.test(trimmed)) {
return {
levels: [...baseClaudeThinkingLevels, { id: "adaptive" }],
defaultLevel: "adaptive",
};
}
return { levels: baseClaudeThinkingLevels };
}
function resolveCurrentPluginConfig(
config: OpenClawConfig | undefined,
): AmazonBedrockPluginConfig | undefined {
const runtimePluginConfig = resolvePluginConfigObject(config, providerId);
return (
(runtimePluginConfig as AmazonBedrockPluginConfig | undefined) ??
(config ? undefined : startupPluginConfig)
);
}
api.registerMemoryEmbeddingProvider(bedrockMemoryEmbeddingProviderAdapter);
const baseWrapStreamFn = ({ modelId, streamFn }: { modelId: string; streamFn?: StreamFn }) => {
if (isAnthropicBedrockModel(modelId)) {
return streamFn;
}
// For app inference profiles with opaque IDs, don't force cacheRetention: "none"
// yet — we may resolve them as Claude later via GetInferenceProfile.
if (isBedrockAppInferenceProfile(modelId)) {
return streamFn;
}
return createBedrockNoCacheWrapper(streamFn);
};
function omitDeprecatedOpus47Temperature<TOptions extends object>(
modelId: string,
options: TOptions,
): TOptions {
if (!isOpus47BedrockModelRef(modelId) || !("temperature" in options)) {
return options;
}
const next = { ...options } as typeof options & { temperature?: unknown };
delete next.temperature;
return next;
}
function omitDeprecatedOpus47PayloadTemperature(payload: Record<string, unknown>): void {
const inferenceConfig = payload.inferenceConfig;
if (!inferenceConfig || typeof inferenceConfig !== "object") {
return;
}
delete (inferenceConfig as Record<string, unknown>).temperature;
}
function withAwsCredentialRefreshOnPayload<TOptions extends object>(
options: TOptions,
): TOptions & { onPayload: (payload: unknown, payloadModel: unknown) => Promise<unknown> } {
const originalOnPayload = (options as { onPayload?: unknown }).onPayload as
| ((payload: unknown, model: unknown) => unknown)
| undefined;
return {
...options,
onPayload: async (payload: unknown, payloadModel: unknown) => {
await refreshAwsSharedConfigCacheForBedrock();
return originalOnPayload?.(payload, payloadModel);
},
};
}
function createAwsCredentialRefreshStreamWrapper(
streamFn: StreamFn | null | undefined,
): StreamFn | null | undefined {
if (!streamFn) {
return streamFn;
}
return (streamModel, context, options) =>
streamFn(streamModel, context, withAwsCredentialRefreshOnPayload(Object.assign({}, options)));
}
/** Extract the AWS region from a bedrock-runtime baseUrl. */
function extractRegionFromBaseUrl(baseUrl: string | undefined): string | undefined {
if (!baseUrl) {
return undefined;
}
return bedrockRegionRe.exec(baseUrl)?.[1];
}
/**
* Resolve the AWS region for Bedrock API calls.
* Provider-specific baseUrl wins over global bedrockDiscovery to avoid signing
* with the wrong region when discovery and provider target different regions.
*/
function resolveBedrockRegion(
config:
| { models?: { bedrockDiscovery?: { region?: string }; providers?: Record<string, unknown> } }
| undefined,
): string | undefined {
// Try provider-specific baseUrl first.
const providers = config?.models?.providers;
if (providers) {
const exact = (providers[providerId] as { baseUrl?: string } | undefined)?.baseUrl;
if (exact) {
const region = extractRegionFromBaseUrl(exact);
if (region) {
return region;
}
}
// Fall back to alias matches (e.g. "bedrock" instead of "amazon-bedrock").
for (const [key, value] of Object.entries(providers)) {
if (key === providerId || normalizeProviderId(key) !== providerId) {
continue;
}
const region = extractRegionFromBaseUrl((value as { baseUrl?: string }).baseUrl);
if (region) {
return region;
}
}
}
return config?.models?.bedrockDiscovery?.region;
}
api.registerProvider({
id: providerId,
label: "Amazon Bedrock",
docsPath: "/providers/models",
auth: [],
catalog: {
order: "simple",
run: async (ctx) => {
const { resolveImplicitBedrockProvider } = await import("./discovery.js");
const currentPluginConfig = resolveCurrentPluginConfig(ctx.config);
const implicit = await resolveImplicitBedrockProvider({
config: ctx.config,
pluginConfig: currentPluginConfig,
env: ctx.env,
});
if (!implicit) {
return null;
}
return {
provider: mergeImplicitBedrockProvider({
existing: ctx.config.models?.providers?.[providerId],
implicit,
}),
};
},
},
resolveConfigApiKey: ({ env }) => resolveBedrockConfigApiKey(env),
...anthropicByModelReplayHooks,
wrapStreamFn: ({ modelId, config, model, streamFn, thinkingLevel, extraParams }) => {
const currentGuardrail = resolveCurrentPluginConfig(config)?.guardrail;
let wrapped =
(currentGuardrail?.guardrailIdentifier && currentGuardrail?.guardrailVersion
? createGuardrailWrapStreamFn(baseWrapStreamFn, currentGuardrail)({ modelId, streamFn })
: baseWrapStreamFn({ modelId, streamFn })) ?? undefined;
const serviceTier = resolveBedrockServiceTier(extraParams, (message) =>
api.logger.warn(message),
);
if (serviceTier && wrapped) {
wrapped = createBedrockServiceTierWrapper(wrapped, serviceTier);
}
const region = resolveBedrockRegion(config) ?? extractRegionFromBaseUrl(model?.baseUrl);
const mayNeedCacheInjection =
isBedrockAppInferenceProfile(modelId) && !piAiWouldInjectCachePoints(modelId);
const shouldOmitTemperature = isOpus47BedrockModelRef(modelId);
const shouldPatchMaxThinking = shouldOmitTemperature && thinkingLevel === "max";
// For known Anthropic models (heuristic match), enable injection immediately.
// For opaque profile IDs, we'll resolve via GetInferenceProfile on first call.
const heuristicMatch = needsCachePointInjection(modelId);
if (!region && !mayNeedCacheInjection && !shouldOmitTemperature && !shouldPatchMaxThinking) {
return createAwsCredentialRefreshStreamWrapper(wrapped);
}
const underlying = wrapped ?? streamFn;
if (!underlying) {
return wrapped;
}
return (streamModel, context, options) => {
const merged = omitDeprecatedOpus47Temperature(
modelId,
Object.assign({}, options, region ? { region } : {}),
);
const originalOnPayload = merged.onPayload as
| ((payload: unknown, model: unknown) => unknown)
| undefined;
if (!mayNeedCacheInjection) {
return underlying(
streamModel,
context,
withAwsCredentialRefreshOnPayload({
...merged,
...(shouldPatchMaxThinking
? {
onPayload: (payload: unknown, payloadModel: unknown) => {
if (payload && typeof payload === "object") {
patchOpus47MaxThinkingEffort(payload as Record<string, unknown>);
}
return originalOnPayload?.(payload, payloadModel);
},
}
: {}),
}),
);
}
// Use the cacheRetention from options if explicitly set.
// When undefined, default to "short" to match pi-ai's internal default.
// Note: if the user set cacheRetention: "none" but the opaque ARN wasn't
// recognized by resolveAnthropicCacheRetentionFamily, the value may have
// been dropped upstream. This is a known limitation — the proper fix is
// to also teach resolveAnthropicCacheRetentionFamily about opaque profiles
// (tracked separately). In practice, users with app inference profiles
// want caching enabled, so defaulting to "short" is the safer behavior.
const cacheRetention =
typeof merged.cacheRetention === "string" ? merged.cacheRetention : "short";
if (heuristicMatch) {
// Fast path: ARN heuristic already identified this as Claude, but the
// concrete target may still need profile traits for Opus 4.7 payloads.
const mayNeedTemperatureTrait = "temperature" in merged;
return underlying(
streamModel,
context,
withAwsCredentialRefreshOnPayload({
...merged,
onPayload: async (payload: unknown, payloadModel: unknown) => {
if (payload && typeof payload === "object") {
const payloadRecord = payload as Record<string, unknown>;
injectBedrockCachePoints(payloadRecord, cacheRetention);
if (shouldPatchMaxThinking) {
patchOpus47MaxThinkingEffort(payloadRecord);
}
if (mayNeedTemperatureTrait) {
const traits = await resolveAppProfileTraits(modelId, region);
if (traits.omitTemperature) {
omitDeprecatedOpus47PayloadTemperature(payloadRecord);
}
}
}
return originalOnPayload?.(payload, payloadModel);
},
}),
);
}
// Slow path: opaque profile ID — resolve underlying model via API (cached).
// pi-ai's onPayload supports async, so we await the resolution inline.
return underlying(
streamModel,
context,
withAwsCredentialRefreshOnPayload({
...merged,
onPayload: async (payload: unknown, payloadModel: unknown) => {
const traits = await resolveAppProfileTraits(modelId, region);
if (payload && typeof payload === "object") {
const payloadRecord = payload as Record<string, unknown>;
if (traits.cacheEligible) {
injectBedrockCachePoints(payloadRecord, cacheRetention);
}
if (shouldPatchMaxThinking) {
patchOpus47MaxThinkingEffort(payloadRecord);
}
if (traits.omitTemperature) {
omitDeprecatedOpus47PayloadTemperature(payloadRecord);
}
}
return originalOnPayload?.(payload, payloadModel);
},
}),
);
};
},
matchesContextOverflowError: ({ errorMessage }) =>
bedrockContextOverflowPatterns.some((pattern) => pattern.test(errorMessage)),
classifyFailoverReason: ({ errorMessage }) => {
if (/ThrottlingException|Too many concurrent requests/i.test(errorMessage)) {
return "rate_limit";
}
if (/ModelNotReadyException/i.test(errorMessage)) {
return "overloaded";
}
if (deprecatedTemperatureValidationRe.test(errorMessage)) {
return "format";
}
return undefined;
},
resolveThinkingProfile: ({ modelId }) => resolveBedrockClaudeThinkingProfile(modelId),
});
}