From 0669b0ddc265742009195eb9f1e9b6e93efb8c02 Mon Sep 17 00:00:00 2001 From: Altay Date: Tue, 10 Mar 2026 00:58:51 +0300 Subject: [PATCH] fix(agents): probe single-provider billing cooldowns (#41422) Merged via squash. Prepared head SHA: bbc4254b94559f95c34e11734a679cbe852aba52 Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com> Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com> Reviewed-by: @altaywtf --- CHANGELOG.md | 1 + src/agents/model-fallback.probe.test.ts | 60 ++++++++++++++---- src/agents/model-fallback.ts | 61 ++++++++++++++++--- ...pi-agent.auth-profile-rotation.e2e.test.ts | 48 +++++++++++++++ 4 files changed, 152 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfa23b105af..8b140351b5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai - ACP/tool streaming: enrich `tool_call` and `tool_call_update` events with best-effort text content and file-location hints so IDE clients can follow bridge tool activity more naturally. (#41442) Thanks @mbelinky. - ACP/runtime attachments: forward normalized inbound image attachments into ACP runtime turns so ACPX sessions can preserve image prompt content on the runtime path. (#41427) Thanks @mbelinky. - ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky. +- Agents/billing recovery: probe single-provider billing cooldowns on the existing throttle so topping up credits can recover without a manual gateway restart. (#41422) thanks @altaywtf. ## 2026.3.8 diff --git a/src/agents/model-fallback.probe.test.ts b/src/agents/model-fallback.probe.test.ts index 01bcb2dc3a8..9426eba6afc 100644 --- a/src/agents/model-fallback.probe.test.ts +++ b/src/agents/model-fallback.probe.test.ts @@ -251,6 +251,36 @@ describe("runWithModelFallback – probe logic", () => { expectPrimaryProbeSuccess(result, run, "probed-ok"); }); + it("prunes stale probe throttle entries before checking eligibility", () => { + _probeThrottleInternals.lastProbeAttempt.set( + "stale", + NOW - _probeThrottleInternals.PROBE_STATE_TTL_MS - 1, + ); + _probeThrottleInternals.lastProbeAttempt.set("fresh", NOW - 5_000); + + expect(_probeThrottleInternals.lastProbeAttempt.has("stale")).toBe(true); + + expect(_probeThrottleInternals.isProbeThrottleOpen(NOW, "fresh")).toBe(false); + + expect(_probeThrottleInternals.lastProbeAttempt.has("stale")).toBe(false); + expect(_probeThrottleInternals.lastProbeAttempt.has("fresh")).toBe(true); + }); + + it("caps probe throttle state by evicting the oldest entries", () => { + for (let i = 0; i < _probeThrottleInternals.MAX_PROBE_KEYS; i += 1) { + _probeThrottleInternals.lastProbeAttempt.set(`key-${i}`, NOW - (i + 1)); + } + + _probeThrottleInternals.markProbeAttempt(NOW, "freshest"); + + expect(_probeThrottleInternals.lastProbeAttempt.size).toBe( + _probeThrottleInternals.MAX_PROBE_KEYS, + ); + expect(_probeThrottleInternals.lastProbeAttempt.has("freshest")).toBe(true); + expect(_probeThrottleInternals.lastProbeAttempt.has("key-255")).toBe(false); + expect(_probeThrottleInternals.lastProbeAttempt.has("key-0")).toBe(true); + }); + it("handles non-finite soonest safely (treats as probe-worthy)", async () => { const cfg = makeCfg(); @@ -346,7 +376,7 @@ describe("runWithModelFallback – probe logic", () => { }); }); - it("skips billing-cooldowned primary when no fallback candidates exist", async () => { + it("probes billing-cooldowned primary when no fallback candidates exist", async () => { const cfg = makeCfg({ agents: { defaults: { @@ -358,20 +388,28 @@ describe("runWithModelFallback – probe logic", () => { }, } as Partial); - // Billing cooldown far from expiry — would normally be skipped + // Single-provider setups need periodic probes even when the billing + // cooldown is far from expiry, otherwise topping up credits never recovers + // without a restart. const expiresIn30Min = NOW + 30 * 60 * 1000; mockedGetSoonestCooldownExpiry.mockReturnValue(expiresIn30Min); mockedResolveProfilesUnavailableReason.mockReturnValue("billing"); - await expect( - runWithModelFallback({ - cfg, - provider: "openai", - model: "gpt-4.1-mini", - fallbacksOverride: [], - run: vi.fn().mockResolvedValue("billing-recovered"), - }), - ).rejects.toThrow("All models failed"); + const run = vi.fn().mockResolvedValue("billing-recovered"); + + const result = await runWithModelFallback({ + cfg, + provider: "openai", + model: "gpt-4.1-mini", + fallbacksOverride: [], + run, + }); + + expect(result.result).toBe("billing-recovered"); + expect(run).toHaveBeenCalledTimes(1); + expect(run).toHaveBeenCalledWith("openai", "gpt-4.1-mini", { + allowTransientCooldownProbe: true, + }); }); it("probes billing-cooldowned primary with fallbacks when near cooldown expiry", async () => { diff --git a/src/agents/model-fallback.ts b/src/agents/model-fallback.ts index ad2b5759233..b9ff9d668ff 100644 --- a/src/agents/model-fallback.ts +++ b/src/agents/model-fallback.ts @@ -342,12 +342,51 @@ const lastProbeAttempt = new Map(); const MIN_PROBE_INTERVAL_MS = 30_000; // 30 seconds between probes per key const PROBE_MARGIN_MS = 2 * 60 * 1000; const PROBE_SCOPE_DELIMITER = "::"; +const PROBE_STATE_TTL_MS = 24 * 60 * 60 * 1000; +const MAX_PROBE_KEYS = 256; function resolveProbeThrottleKey(provider: string, agentDir?: string): string { const scope = String(agentDir ?? "").trim(); return scope ? `${scope}${PROBE_SCOPE_DELIMITER}${provider}` : provider; } +function pruneProbeState(now: number): void { + for (const [key, ts] of lastProbeAttempt) { + if (!Number.isFinite(ts) || ts <= 0 || now - ts > PROBE_STATE_TTL_MS) { + lastProbeAttempt.delete(key); + } + } +} + +function enforceProbeStateCap(): void { + while (lastProbeAttempt.size > MAX_PROBE_KEYS) { + let oldestKey: string | null = null; + let oldestTs = Number.POSITIVE_INFINITY; + for (const [key, ts] of lastProbeAttempt) { + if (ts < oldestTs) { + oldestKey = key; + oldestTs = ts; + } + } + if (!oldestKey) { + break; + } + lastProbeAttempt.delete(oldestKey); + } +} + +function isProbeThrottleOpen(now: number, throttleKey: string): boolean { + pruneProbeState(now); + const lastProbe = lastProbeAttempt.get(throttleKey) ?? 0; + return now - lastProbe >= MIN_PROBE_INTERVAL_MS; +} + +function markProbeAttempt(now: number, throttleKey: string): void { + pruneProbeState(now); + lastProbeAttempt.set(throttleKey, now); + enforceProbeStateCap(); +} + function shouldProbePrimaryDuringCooldown(params: { isPrimary: boolean; hasFallbackCandidates: boolean; @@ -360,8 +399,7 @@ function shouldProbePrimaryDuringCooldown(params: { return false; } - const lastProbe = lastProbeAttempt.get(params.throttleKey) ?? 0; - if (params.now - lastProbe < MIN_PROBE_INTERVAL_MS) { + if (!isProbeThrottleOpen(params.now, params.throttleKey)) { return false; } @@ -379,7 +417,12 @@ export const _probeThrottleInternals = { lastProbeAttempt, MIN_PROBE_INTERVAL_MS, PROBE_MARGIN_MS, + PROBE_STATE_TTL_MS, + MAX_PROBE_KEYS, resolveProbeThrottleKey, + isProbeThrottleOpen, + pruneProbeState, + markProbeAttempt, } as const; type CooldownDecision = @@ -429,11 +472,15 @@ function resolveCooldownDecision(params: { } // Billing is semi-persistent: the user may fix their balance, or a transient - // 402 might have been misclassified. Probe the primary only when fallbacks - // exist; otherwise repeated single-provider probes just churn the disabled - // auth state without opening any recovery path. + // 402 might have been misclassified. Probe single-provider setups on the + // standard throttle so they can recover without a restart; when fallbacks + // exist, only probe near cooldown expiry so the fallback chain stays preferred. if (inferredReason === "billing") { - if (params.isPrimary && params.hasFallbackCandidates && shouldProbe) { + const shouldProbeSingleProviderBilling = + params.isPrimary && + !params.hasFallbackCandidates && + isProbeThrottleOpen(params.now, params.probeThrottleKey); + if (params.isPrimary && (shouldProbe || shouldProbeSingleProviderBilling)) { return { type: "attempt", reason: inferredReason, markProbe: true }; } return { @@ -528,7 +575,7 @@ export async function runWithModelFallback(params: { } if (decision.markProbe) { - lastProbeAttempt.set(probeThrottleKey, now); + markProbeAttempt(now, probeThrottleKey); } if ( decision.reason === "rate_limit" || diff --git a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts index 75ce17eb197..432ae17daa1 100644 --- a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts +++ b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts @@ -1013,6 +1013,54 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { }); }); + it("can probe one billing-disabled profile when transient cooldown probe is allowed without fallback models", async () => { + await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => { + await writeAuthStore(agentDir, { + usageStats: { + "openai:p1": { + lastUsed: 1, + disabledUntil: now + 60 * 60 * 1000, + disabledReason: "billing", + }, + "openai:p2": { + lastUsed: 2, + disabledUntil: now + 60 * 60 * 1000, + disabledReason: "billing", + }, + }, + }); + + runEmbeddedAttemptMock.mockResolvedValueOnce( + makeAttempt({ + assistantTexts: ["ok"], + lastAssistant: buildAssistant({ + stopReason: "stop", + content: [{ type: "text", text: "ok" }], + }), + }), + ); + + const result = await runEmbeddedPiAgent({ + sessionId: "session:test", + sessionKey: "agent:test:billing-cooldown-probe-no-fallbacks", + sessionFile: path.join(workspaceDir, "session.jsonl"), + workspaceDir, + agentDir, + config: makeConfig(), + prompt: "hello", + provider: "openai", + model: "mock-1", + authProfileIdSource: "auto", + allowTransientCooldownProbe: true, + timeoutMs: 5_000, + runId: "run:billing-cooldown-probe-no-fallbacks", + }); + + expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1); + expect(result.payloads?.[0]?.text ?? "").toContain("ok"); + }); + }); + it("treats agent-level fallbacks as configured when defaults have none", async () => { await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => { await writeAuthStore(agentDir, {