diff --git a/CHANGELOG.md b/CHANGELOG.md index 545f60c0e6e..d45c97dc82c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Gateway/diagnostics: include a bounded redacted startup error message in stability bundles, so crash-loop reports identify the failing plugin or contract without exposing secrets. Refs #75797. Thanks @ymebosma. +- Gateway/pricing: abort in-flight model pricing catalog fetches when Gateway shutdown stops the refresh loop, and avoid post-stop cache writes or refresh timers. Fixes #72208. Thanks @rzcq. - Control UI/Talk: allow the OpenAI Realtime WebRTC offer endpoint through the Control UI CSP, configure browser sessions with explicit VAD/transcription input settings, and surface OpenAI realtime error/lifecycle events instead of leaving Talk stuck as live with no diagnostic. Fixes #73427. - Plugins: clarify config-selected duplicate plugin override diagnostics and document manifest schema updates for bundled-plugin forks. Fixes #8582. Thanks @sachah. - Providers/OpenAI: resolve `keychain::` `OPENAI_API_KEY` refs before creating OpenAI Realtime browser sessions or voice bridges, with a bounded cached Keychain lookup. Fixes #72120. Thanks @ctbritt. diff --git a/src/gateway/model-pricing-cache.test.ts b/src/gateway/model-pricing-cache.test.ts index 3010e71181e..1334d51e178 100644 --- a/src/gateway/model-pricing-cache.test.ts +++ b/src/gateway/model-pricing-cache.test.ts @@ -834,6 +834,54 @@ describe("model-pricing-cache", () => { stop(); }); + it("aborts in-flight bootstrap pricing fetches after stop", async () => { + const config = { + agents: { + defaults: { + model: { primary: "anthropic/claude-opus-4-6" }, + }, + }, + } as unknown as OpenClawConfig; + const abortedUrls: string[] = []; + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const fetchImpl = withFetchPreconnect( + vi.fn( + (input: RequestInfo | URL, init?: RequestInit) => + new Promise((_resolve, reject) => { + const url = + typeof input === "string" ? input : input instanceof URL ? input.href : input.url; + const signal = init?.signal; + expect(signal).toBeDefined(); + signal?.addEventListener( + "abort", + () => { + abortedUrls.push(url); + reject(signal.reason); + }, + { once: true }, + ); + }), + ), + ); + + try { + const stop = startGatewayModelPricingRefresh({ config, fetchImpl }); + + await vi.dynamicImportSettled(); + expect(fetchImpl).toHaveBeenCalledTimes(2); + stop(); + await vi.waitFor(() => expect(abortedUrls).toHaveLength(2)); + await vi.dynamicImportSettled(); + + expect(setTimeoutSpy.mock.calls.some(([, delay]) => delay === 24 * 60 * 60_000)).toBe(false); + expect( + getCachedGatewayModelPricing({ provider: "anthropic", model: "claude-opus-4-6" }), + ).toBeUndefined(); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + it("does not bootstrap remote pricing when pricing is disabled", async () => { const config = { agents: { diff --git a/src/gateway/model-pricing-cache.ts b/src/gateway/model-pricing-cache.ts index fb56723c115..47f84b12165 100644 --- a/src/gateway/model-pricing-cache.ts +++ b/src/gateway/model-pricing-cache.ts @@ -50,6 +50,17 @@ type OpenRouterModelPayload = { pricing?: unknown; }; +type GatewayModelPricingRefreshParams = { + config: OpenClawConfig; + env?: NodeJS.ProcessEnv; + fetchImpl?: typeof fetch; + workspaceDir?: string; + pluginMetadataSnapshot?: PluginMetadataRegistryView; + pluginLookUpTable?: PluginMetadataRegistryView; + manifestRegistry?: PluginManifestRegistry; + signal?: AbortSignal; +}; + type ExternalPricingPolicy = { external: boolean; openRouter?: ExternalPricingSourcePolicy; @@ -143,6 +154,11 @@ function isTimeoutError(error: unknown): boolean { return /\bTimeoutError\b/u.test(String(error)); } +function createPricingFetchSignal(signal: AbortSignal | undefined): AbortSignal { + const timeoutSignal = AbortSignal.timeout(FETCH_TIMEOUT_MS); + return signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; +} + function formatPricingFetchFailure(source: "LiteLLM" | "OpenRouter", error: unknown): string { if (isTimeoutError(error)) { return `${source} pricing fetch failed (timeout ${formatTimeoutSeconds(FETCH_TIMEOUT_MS)}): ${String(error)}`; @@ -326,10 +342,13 @@ function parseLiteLLMPricing(entry: LiteLLMModelEntry): CachedModelPricing | nul type LiteLLMPricingCatalog = Map; -async function fetchLiteLLMPricingCatalog(fetchImpl: typeof fetch): Promise { +async function fetchLiteLLMPricingCatalog( + fetchImpl: typeof fetch, + signal?: AbortSignal, +): Promise { const response = await fetchImpl(LITELLM_PRICING_URL, { headers: { Accept: "application/json" }, - signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + signal: createPricingFetchSignal(signal), }); if (!response.ok) { throw new Error(`LiteLLM pricing fetch failed: HTTP ${response.status}`); @@ -982,10 +1001,11 @@ export function collectConfiguredModelPricingRefs( async function fetchOpenRouterPricingCatalog( fetchImpl: typeof fetch, + signal?: AbortSignal, ): Promise> { const response = await fetchImpl(OPENROUTER_MODELS_URL, { headers: { Accept: "application/json" }, - signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + signal: createPricingFetchSignal(signal), }); if (!response.ok) { throw new Error(`OpenRouter /models failed: HTTP ${response.status}`); @@ -1064,14 +1084,23 @@ function resolveLiteLLMPricingForRef(params: { return undefined; } -function scheduleRefresh(params: { config: OpenClawConfig; fetchImpl: typeof fetch }): void { +function scheduleRefresh( + params: GatewayModelPricingRefreshParams & { fetchImpl: typeof fetch }, +): void { clearRefreshTimer(); + if (params.signal?.aborted) { + return; + } refreshTimer = setTimeout(() => { refreshTimer = null; + if (params.signal?.aborted) { + return; + } void refreshGatewayModelPricingCache(params).catch((error: unknown) => { log.warn(`pricing refresh failed: ${String(error)}`); }); }, CACHE_TTL_MS); + refreshTimer.unref?.(); } function collectSeededPricing(params: { @@ -1100,19 +1129,16 @@ function collectSeededPricing(params: { return seeded; } -export async function refreshGatewayModelPricingCache(params: { - config: OpenClawConfig; - env?: NodeJS.ProcessEnv; - fetchImpl?: typeof fetch; - workspaceDir?: string; - pluginMetadataSnapshot?: PluginMetadataRegistryView; - pluginLookUpTable?: PluginMetadataRegistryView; - manifestRegistry?: PluginManifestRegistry; -}): Promise { +export async function refreshGatewayModelPricingCache( + params: GatewayModelPricingRefreshParams, +): Promise { if (!isGatewayModelPricingEnabled(params.config)) { clearRefreshTimer(); return; } + if (params.signal?.aborted) { + return; + } if (inFlightRefresh) { return await inFlightRefresh; } @@ -1147,6 +1173,9 @@ export async function refreshGatewayModelPricingCache(params: { allowPluginNormalization: normalizationOptions.allowPluginNormalization, }); if (refs.length === 0) { + if (params.signal?.aborted) { + return; + } replaceGatewayModelPricingCache(seededPricing); clearRefreshTimer(); return; @@ -1157,18 +1186,22 @@ export async function refreshGatewayModelPricingCache(params: { let openRouterFailed = false; let litellmFailed = false; const [catalogById, litellmCatalog] = await Promise.all([ - fetchOpenRouterPricingCatalog(fetchImpl).catch((error: unknown) => { + fetchOpenRouterPricingCatalog(fetchImpl, params.signal).catch((error: unknown) => { log.warn(formatPricingFetchFailure("OpenRouter", error)); openRouterFailed = true; return new Map(); }), - fetchLiteLLMPricingCatalog(fetchImpl).catch((error: unknown) => { + fetchLiteLLMPricingCatalog(fetchImpl, params.signal).catch((error: unknown) => { log.warn(formatPricingFetchFailure("LiteLLM", error)); litellmFailed = true; return new Map() as LiteLLMPricingCatalog; }), ]); + if (params.signal?.aborted) { + return; + } + const catalogByNormalizedId = new Map(); for (const entry of catalogById.values()) { const normalizedId = canonicalizeOpenRouterLookupId(entry.id, normalizationOptions); @@ -1226,7 +1259,7 @@ export async function refreshGatewayModelPricingCache(params: { if (nextPricing.size === 0 && existingMeta.size > 0) { // Both sources failed — retain the entire existing cache. log.warn("Both pricing sources returned empty data — retaining existing cache"); - scheduleRefresh({ config: params.config, fetchImpl }); + scheduleRefresh({ ...params, fetchImpl }); return; } // Partial failure — back-fill missing models from the existing cache. @@ -1244,8 +1277,11 @@ export async function refreshGatewayModelPricingCache(params: { } } + if (params.signal?.aborted) { + return; + } replaceGatewayModelPricingCache(nextPricing); - scheduleRefresh({ config: params.config, fetchImpl }); + scheduleRefresh({ ...params, fetchImpl }); })(); try { @@ -1255,30 +1291,28 @@ export async function refreshGatewayModelPricingCache(params: { } } -export function startGatewayModelPricingRefresh(params: { - config: OpenClawConfig; - env?: NodeJS.ProcessEnv; - fetchImpl?: typeof fetch; - workspaceDir?: string; - pluginMetadataSnapshot?: PluginMetadataRegistryView; - pluginLookUpTable?: PluginMetadataRegistryView; - manifestRegistry?: PluginManifestRegistry; -}): () => void { +export function startGatewayModelPricingRefresh( + params: GatewayModelPricingRefreshParams, +): () => void { if (!isGatewayModelPricingEnabled(params.config)) { clearRefreshTimer(); return () => {}; } let stopped = false; + const abortController = new AbortController(); queueMicrotask(() => { if (stopped) { return; } - void refreshGatewayModelPricingCache(params).catch((error: unknown) => { - log.warn(`pricing bootstrap failed: ${String(error)}`); - }); + void refreshGatewayModelPricingCache({ ...params, signal: abortController.signal }).catch( + (error: unknown) => { + log.warn(`pricing bootstrap failed: ${String(error)}`); + }, + ); }); return () => { stopped = true; + abortController.abort(); clearRefreshTimer(); }; }