fix(gateway): abort stopped pricing refreshes

This commit is contained in:
Peter Steinberger
2026-05-02 09:14:54 +01:00
parent bccd50b09b
commit 7b5d95671c
3 changed files with 112 additions and 29 deletions

View File

@@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/diagnostics: include a bounded redacted startup error message in stability bundles, so crash-loop reports identify the failing plugin or contract without exposing secrets. Refs #75797. Thanks @ymebosma.
- Gateway/pricing: abort in-flight model pricing catalog fetches when Gateway shutdown stops the refresh loop, and avoid post-stop cache writes or refresh timers. Fixes #72208. Thanks @rzcq.
- Control UI/Talk: allow the OpenAI Realtime WebRTC offer endpoint through the Control UI CSP, configure browser sessions with explicit VAD/transcription input settings, and surface OpenAI realtime error/lifecycle events instead of leaving Talk stuck as live with no diagnostic. Fixes #73427.
- Plugins: clarify config-selected duplicate plugin override diagnostics and document manifest schema updates for bundled-plugin forks. Fixes #8582. Thanks @sachah.
- Providers/OpenAI: resolve `keychain:<service>:<account>` `OPENAI_API_KEY` refs before creating OpenAI Realtime browser sessions or voice bridges, with a bounded cached Keychain lookup. Fixes #72120. Thanks @ctbritt.

View File

@@ -834,6 +834,54 @@ describe("model-pricing-cache", () => {
stop();
});
it("aborts in-flight bootstrap pricing fetches after stop", async () => {
const config = {
agents: {
defaults: {
model: { primary: "anthropic/claude-opus-4-6" },
},
},
} as unknown as OpenClawConfig;
const abortedUrls: string[] = [];
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
const fetchImpl = withFetchPreconnect(
vi.fn(
(input: RequestInfo | URL, init?: RequestInit) =>
new Promise<Response>((_resolve, reject) => {
const url =
typeof input === "string" ? input : input instanceof URL ? input.href : input.url;
const signal = init?.signal;
expect(signal).toBeDefined();
signal?.addEventListener(
"abort",
() => {
abortedUrls.push(url);
reject(signal.reason);
},
{ once: true },
);
}),
),
);
try {
const stop = startGatewayModelPricingRefresh({ config, fetchImpl });
await vi.dynamicImportSettled();
expect(fetchImpl).toHaveBeenCalledTimes(2);
stop();
await vi.waitFor(() => expect(abortedUrls).toHaveLength(2));
await vi.dynamicImportSettled();
expect(setTimeoutSpy.mock.calls.some(([, delay]) => delay === 24 * 60 * 60_000)).toBe(false);
expect(
getCachedGatewayModelPricing({ provider: "anthropic", model: "claude-opus-4-6" }),
).toBeUndefined();
} finally {
setTimeoutSpy.mockRestore();
}
});
it("does not bootstrap remote pricing when pricing is disabled", async () => {
const config = {
agents: {

View File

@@ -50,6 +50,17 @@ type OpenRouterModelPayload = {
pricing?: unknown;
};
type GatewayModelPricingRefreshParams = {
config: OpenClawConfig;
env?: NodeJS.ProcessEnv;
fetchImpl?: typeof fetch;
workspaceDir?: string;
pluginMetadataSnapshot?: PluginMetadataRegistryView;
pluginLookUpTable?: PluginMetadataRegistryView;
manifestRegistry?: PluginManifestRegistry;
signal?: AbortSignal;
};
type ExternalPricingPolicy = {
external: boolean;
openRouter?: ExternalPricingSourcePolicy;
@@ -143,6 +154,11 @@ function isTimeoutError(error: unknown): boolean {
return /\bTimeoutError\b/u.test(String(error));
}
function createPricingFetchSignal(signal: AbortSignal | undefined): AbortSignal {
const timeoutSignal = AbortSignal.timeout(FETCH_TIMEOUT_MS);
return signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal;
}
function formatPricingFetchFailure(source: "LiteLLM" | "OpenRouter", error: unknown): string {
if (isTimeoutError(error)) {
return `${source} pricing fetch failed (timeout ${formatTimeoutSeconds(FETCH_TIMEOUT_MS)}): ${String(error)}`;
@@ -326,10 +342,13 @@ function parseLiteLLMPricing(entry: LiteLLMModelEntry): CachedModelPricing | nul
type LiteLLMPricingCatalog = Map<string, CachedModelPricing>;
async function fetchLiteLLMPricingCatalog(fetchImpl: typeof fetch): Promise<LiteLLMPricingCatalog> {
async function fetchLiteLLMPricingCatalog(
fetchImpl: typeof fetch,
signal?: AbortSignal,
): Promise<LiteLLMPricingCatalog> {
const response = await fetchImpl(LITELLM_PRICING_URL, {
headers: { Accept: "application/json" },
signal: AbortSignal.timeout(FETCH_TIMEOUT_MS),
signal: createPricingFetchSignal(signal),
});
if (!response.ok) {
throw new Error(`LiteLLM pricing fetch failed: HTTP ${response.status}`);
@@ -982,10 +1001,11 @@ export function collectConfiguredModelPricingRefs(
async function fetchOpenRouterPricingCatalog(
fetchImpl: typeof fetch,
signal?: AbortSignal,
): Promise<Map<string, OpenRouterPricingEntry>> {
const response = await fetchImpl(OPENROUTER_MODELS_URL, {
headers: { Accept: "application/json" },
signal: AbortSignal.timeout(FETCH_TIMEOUT_MS),
signal: createPricingFetchSignal(signal),
});
if (!response.ok) {
throw new Error(`OpenRouter /models failed: HTTP ${response.status}`);
@@ -1064,14 +1084,23 @@ function resolveLiteLLMPricingForRef(params: {
return undefined;
}
function scheduleRefresh(params: { config: OpenClawConfig; fetchImpl: typeof fetch }): void {
function scheduleRefresh(
params: GatewayModelPricingRefreshParams & { fetchImpl: typeof fetch },
): void {
clearRefreshTimer();
if (params.signal?.aborted) {
return;
}
refreshTimer = setTimeout(() => {
refreshTimer = null;
if (params.signal?.aborted) {
return;
}
void refreshGatewayModelPricingCache(params).catch((error: unknown) => {
log.warn(`pricing refresh failed: ${String(error)}`);
});
}, CACHE_TTL_MS);
refreshTimer.unref?.();
}
function collectSeededPricing(params: {
@@ -1100,19 +1129,16 @@ function collectSeededPricing(params: {
return seeded;
}
export async function refreshGatewayModelPricingCache(params: {
config: OpenClawConfig;
env?: NodeJS.ProcessEnv;
fetchImpl?: typeof fetch;
workspaceDir?: string;
pluginMetadataSnapshot?: PluginMetadataRegistryView;
pluginLookUpTable?: PluginMetadataRegistryView;
manifestRegistry?: PluginManifestRegistry;
}): Promise<void> {
export async function refreshGatewayModelPricingCache(
params: GatewayModelPricingRefreshParams,
): Promise<void> {
if (!isGatewayModelPricingEnabled(params.config)) {
clearRefreshTimer();
return;
}
if (params.signal?.aborted) {
return;
}
if (inFlightRefresh) {
return await inFlightRefresh;
}
@@ -1147,6 +1173,9 @@ export async function refreshGatewayModelPricingCache(params: {
allowPluginNormalization: normalizationOptions.allowPluginNormalization,
});
if (refs.length === 0) {
if (params.signal?.aborted) {
return;
}
replaceGatewayModelPricingCache(seededPricing);
clearRefreshTimer();
return;
@@ -1157,18 +1186,22 @@ export async function refreshGatewayModelPricingCache(params: {
let openRouterFailed = false;
let litellmFailed = false;
const [catalogById, litellmCatalog] = await Promise.all([
fetchOpenRouterPricingCatalog(fetchImpl).catch((error: unknown) => {
fetchOpenRouterPricingCatalog(fetchImpl, params.signal).catch((error: unknown) => {
log.warn(formatPricingFetchFailure("OpenRouter", error));
openRouterFailed = true;
return new Map<string, OpenRouterPricingEntry>();
}),
fetchLiteLLMPricingCatalog(fetchImpl).catch((error: unknown) => {
fetchLiteLLMPricingCatalog(fetchImpl, params.signal).catch((error: unknown) => {
log.warn(formatPricingFetchFailure("LiteLLM", error));
litellmFailed = true;
return new Map<string, CachedModelPricing>() as LiteLLMPricingCatalog;
}),
]);
if (params.signal?.aborted) {
return;
}
const catalogByNormalizedId = new Map<string, OpenRouterPricingEntry>();
for (const entry of catalogById.values()) {
const normalizedId = canonicalizeOpenRouterLookupId(entry.id, normalizationOptions);
@@ -1226,7 +1259,7 @@ export async function refreshGatewayModelPricingCache(params: {
if (nextPricing.size === 0 && existingMeta.size > 0) {
// Both sources failed — retain the entire existing cache.
log.warn("Both pricing sources returned empty data — retaining existing cache");
scheduleRefresh({ config: params.config, fetchImpl });
scheduleRefresh({ ...params, fetchImpl });
return;
}
// Partial failure — back-fill missing models from the existing cache.
@@ -1244,8 +1277,11 @@ export async function refreshGatewayModelPricingCache(params: {
}
}
if (params.signal?.aborted) {
return;
}
replaceGatewayModelPricingCache(nextPricing);
scheduleRefresh({ config: params.config, fetchImpl });
scheduleRefresh({ ...params, fetchImpl });
})();
try {
@@ -1255,30 +1291,28 @@ export async function refreshGatewayModelPricingCache(params: {
}
}
export function startGatewayModelPricingRefresh(params: {
config: OpenClawConfig;
env?: NodeJS.ProcessEnv;
fetchImpl?: typeof fetch;
workspaceDir?: string;
pluginMetadataSnapshot?: PluginMetadataRegistryView;
pluginLookUpTable?: PluginMetadataRegistryView;
manifestRegistry?: PluginManifestRegistry;
}): () => void {
export function startGatewayModelPricingRefresh(
params: GatewayModelPricingRefreshParams,
): () => void {
if (!isGatewayModelPricingEnabled(params.config)) {
clearRefreshTimer();
return () => {};
}
let stopped = false;
const abortController = new AbortController();
queueMicrotask(() => {
if (stopped) {
return;
}
void refreshGatewayModelPricingCache(params).catch((error: unknown) => {
log.warn(`pricing bootstrap failed: ${String(error)}`);
});
void refreshGatewayModelPricingCache({ ...params, signal: abortController.signal }).catch(
(error: unknown) => {
log.warn(`pricing bootstrap failed: ${String(error)}`);
},
);
});
return () => {
stopped = true;
abortController.abort();
clearRefreshTimer();
};
}