fix(agents): probe single-provider billing cooldowns (#41422)

Merged via squash.

Prepared head SHA: bbc4254b94
Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com>
Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com>
Reviewed-by: @altaywtf
This commit is contained in:
Altay
2026-03-10 00:58:51 +03:00
committed by GitHub
parent 0c7f07818f
commit 0669b0ddc2
4 changed files with 152 additions and 18 deletions

View File

@@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai
- ACP/tool streaming: enrich `tool_call` and `tool_call_update` events with best-effort text content and file-location hints so IDE clients can follow bridge tool activity more naturally. (#41442) Thanks @mbelinky. - ACP/tool streaming: enrich `tool_call` and `tool_call_update` events with best-effort text content and file-location hints so IDE clients can follow bridge tool activity more naturally. (#41442) Thanks @mbelinky.
- ACP/runtime attachments: forward normalized inbound image attachments into ACP runtime turns so ACPX sessions can preserve image prompt content on the runtime path. (#41427) Thanks @mbelinky. - ACP/runtime attachments: forward normalized inbound image attachments into ACP runtime turns so ACPX sessions can preserve image prompt content on the runtime path. (#41427) Thanks @mbelinky.
- ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky. - ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky.
- Agents/billing recovery: probe single-provider billing cooldowns on the existing throttle so topping up credits can recover without a manual gateway restart. (#41422) thanks @altaywtf.
## 2026.3.8 ## 2026.3.8

View File

@@ -251,6 +251,36 @@ describe("runWithModelFallback probe logic", () => {
expectPrimaryProbeSuccess(result, run, "probed-ok"); expectPrimaryProbeSuccess(result, run, "probed-ok");
}); });
it("prunes stale probe throttle entries before checking eligibility", () => {
_probeThrottleInternals.lastProbeAttempt.set(
"stale",
NOW - _probeThrottleInternals.PROBE_STATE_TTL_MS - 1,
);
_probeThrottleInternals.lastProbeAttempt.set("fresh", NOW - 5_000);
expect(_probeThrottleInternals.lastProbeAttempt.has("stale")).toBe(true);
expect(_probeThrottleInternals.isProbeThrottleOpen(NOW, "fresh")).toBe(false);
expect(_probeThrottleInternals.lastProbeAttempt.has("stale")).toBe(false);
expect(_probeThrottleInternals.lastProbeAttempt.has("fresh")).toBe(true);
});
it("caps probe throttle state by evicting the oldest entries", () => {
for (let i = 0; i < _probeThrottleInternals.MAX_PROBE_KEYS; i += 1) {
_probeThrottleInternals.lastProbeAttempt.set(`key-${i}`, NOW - (i + 1));
}
_probeThrottleInternals.markProbeAttempt(NOW, "freshest");
expect(_probeThrottleInternals.lastProbeAttempt.size).toBe(
_probeThrottleInternals.MAX_PROBE_KEYS,
);
expect(_probeThrottleInternals.lastProbeAttempt.has("freshest")).toBe(true);
expect(_probeThrottleInternals.lastProbeAttempt.has("key-255")).toBe(false);
expect(_probeThrottleInternals.lastProbeAttempt.has("key-0")).toBe(true);
});
it("handles non-finite soonest safely (treats as probe-worthy)", async () => { it("handles non-finite soonest safely (treats as probe-worthy)", async () => {
const cfg = makeCfg(); const cfg = makeCfg();
@@ -346,7 +376,7 @@ describe("runWithModelFallback probe logic", () => {
}); });
}); });
it("skips billing-cooldowned primary when no fallback candidates exist", async () => { it("probes billing-cooldowned primary when no fallback candidates exist", async () => {
const cfg = makeCfg({ const cfg = makeCfg({
agents: { agents: {
defaults: { defaults: {
@@ -358,20 +388,28 @@ describe("runWithModelFallback probe logic", () => {
}, },
} as Partial<OpenClawConfig>); } as Partial<OpenClawConfig>);
// Billing cooldown far from expiry — would normally be skipped // Single-provider setups need periodic probes even when the billing
// cooldown is far from expiry, otherwise topping up credits never recovers
// without a restart.
const expiresIn30Min = NOW + 30 * 60 * 1000; const expiresIn30Min = NOW + 30 * 60 * 1000;
mockedGetSoonestCooldownExpiry.mockReturnValue(expiresIn30Min); mockedGetSoonestCooldownExpiry.mockReturnValue(expiresIn30Min);
mockedResolveProfilesUnavailableReason.mockReturnValue("billing"); mockedResolveProfilesUnavailableReason.mockReturnValue("billing");
await expect( const run = vi.fn().mockResolvedValue("billing-recovered");
runWithModelFallback({
cfg, const result = await runWithModelFallback({
provider: "openai", cfg,
model: "gpt-4.1-mini", provider: "openai",
fallbacksOverride: [], model: "gpt-4.1-mini",
run: vi.fn().mockResolvedValue("billing-recovered"), fallbacksOverride: [],
}), run,
).rejects.toThrow("All models failed"); });
expect(result.result).toBe("billing-recovered");
expect(run).toHaveBeenCalledTimes(1);
expect(run).toHaveBeenCalledWith("openai", "gpt-4.1-mini", {
allowTransientCooldownProbe: true,
});
}); });
it("probes billing-cooldowned primary with fallbacks when near cooldown expiry", async () => { it("probes billing-cooldowned primary with fallbacks when near cooldown expiry", async () => {

View File

@@ -342,12 +342,51 @@ const lastProbeAttempt = new Map<string, number>();
const MIN_PROBE_INTERVAL_MS = 30_000; // 30 seconds between probes per key const MIN_PROBE_INTERVAL_MS = 30_000; // 30 seconds between probes per key
const PROBE_MARGIN_MS = 2 * 60 * 1000; const PROBE_MARGIN_MS = 2 * 60 * 1000;
const PROBE_SCOPE_DELIMITER = "::"; const PROBE_SCOPE_DELIMITER = "::";
const PROBE_STATE_TTL_MS = 24 * 60 * 60 * 1000;
const MAX_PROBE_KEYS = 256;
function resolveProbeThrottleKey(provider: string, agentDir?: string): string { function resolveProbeThrottleKey(provider: string, agentDir?: string): string {
const scope = String(agentDir ?? "").trim(); const scope = String(agentDir ?? "").trim();
return scope ? `${scope}${PROBE_SCOPE_DELIMITER}${provider}` : provider; return scope ? `${scope}${PROBE_SCOPE_DELIMITER}${provider}` : provider;
} }
function pruneProbeState(now: number): void {
for (const [key, ts] of lastProbeAttempt) {
if (!Number.isFinite(ts) || ts <= 0 || now - ts > PROBE_STATE_TTL_MS) {
lastProbeAttempt.delete(key);
}
}
}
function enforceProbeStateCap(): void {
while (lastProbeAttempt.size > MAX_PROBE_KEYS) {
let oldestKey: string | null = null;
let oldestTs = Number.POSITIVE_INFINITY;
for (const [key, ts] of lastProbeAttempt) {
if (ts < oldestTs) {
oldestKey = key;
oldestTs = ts;
}
}
if (!oldestKey) {
break;
}
lastProbeAttempt.delete(oldestKey);
}
}
function isProbeThrottleOpen(now: number, throttleKey: string): boolean {
pruneProbeState(now);
const lastProbe = lastProbeAttempt.get(throttleKey) ?? 0;
return now - lastProbe >= MIN_PROBE_INTERVAL_MS;
}
function markProbeAttempt(now: number, throttleKey: string): void {
pruneProbeState(now);
lastProbeAttempt.set(throttleKey, now);
enforceProbeStateCap();
}
function shouldProbePrimaryDuringCooldown(params: { function shouldProbePrimaryDuringCooldown(params: {
isPrimary: boolean; isPrimary: boolean;
hasFallbackCandidates: boolean; hasFallbackCandidates: boolean;
@@ -360,8 +399,7 @@ function shouldProbePrimaryDuringCooldown(params: {
return false; return false;
} }
const lastProbe = lastProbeAttempt.get(params.throttleKey) ?? 0; if (!isProbeThrottleOpen(params.now, params.throttleKey)) {
if (params.now - lastProbe < MIN_PROBE_INTERVAL_MS) {
return false; return false;
} }
@@ -379,7 +417,12 @@ export const _probeThrottleInternals = {
lastProbeAttempt, lastProbeAttempt,
MIN_PROBE_INTERVAL_MS, MIN_PROBE_INTERVAL_MS,
PROBE_MARGIN_MS, PROBE_MARGIN_MS,
PROBE_STATE_TTL_MS,
MAX_PROBE_KEYS,
resolveProbeThrottleKey, resolveProbeThrottleKey,
isProbeThrottleOpen,
pruneProbeState,
markProbeAttempt,
} as const; } as const;
type CooldownDecision = type CooldownDecision =
@@ -429,11 +472,15 @@ function resolveCooldownDecision(params: {
} }
// Billing is semi-persistent: the user may fix their balance, or a transient // Billing is semi-persistent: the user may fix their balance, or a transient
// 402 might have been misclassified. Probe the primary only when fallbacks // 402 might have been misclassified. Probe single-provider setups on the
// exist; otherwise repeated single-provider probes just churn the disabled // standard throttle so they can recover without a restart; when fallbacks
// auth state without opening any recovery path. // exist, only probe near cooldown expiry so the fallback chain stays preferred.
if (inferredReason === "billing") { if (inferredReason === "billing") {
if (params.isPrimary && params.hasFallbackCandidates && shouldProbe) { const shouldProbeSingleProviderBilling =
params.isPrimary &&
!params.hasFallbackCandidates &&
isProbeThrottleOpen(params.now, params.probeThrottleKey);
if (params.isPrimary && (shouldProbe || shouldProbeSingleProviderBilling)) {
return { type: "attempt", reason: inferredReason, markProbe: true }; return { type: "attempt", reason: inferredReason, markProbe: true };
} }
return { return {
@@ -528,7 +575,7 @@ export async function runWithModelFallback<T>(params: {
} }
if (decision.markProbe) { if (decision.markProbe) {
lastProbeAttempt.set(probeThrottleKey, now); markProbeAttempt(now, probeThrottleKey);
} }
if ( if (
decision.reason === "rate_limit" || decision.reason === "rate_limit" ||

View File

@@ -1013,6 +1013,54 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
}); });
}); });
it("can probe one billing-disabled profile when transient cooldown probe is allowed without fallback models", async () => {
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
await writeAuthStore(agentDir, {
usageStats: {
"openai:p1": {
lastUsed: 1,
disabledUntil: now + 60 * 60 * 1000,
disabledReason: "billing",
},
"openai:p2": {
lastUsed: 2,
disabledUntil: now + 60 * 60 * 1000,
disabledReason: "billing",
},
},
});
runEmbeddedAttemptMock.mockResolvedValueOnce(
makeAttempt({
assistantTexts: ["ok"],
lastAssistant: buildAssistant({
stopReason: "stop",
content: [{ type: "text", text: "ok" }],
}),
}),
);
const result = await runEmbeddedPiAgent({
sessionId: "session:test",
sessionKey: "agent:test:billing-cooldown-probe-no-fallbacks",
sessionFile: path.join(workspaceDir, "session.jsonl"),
workspaceDir,
agentDir,
config: makeConfig(),
prompt: "hello",
provider: "openai",
model: "mock-1",
authProfileIdSource: "auto",
allowTransientCooldownProbe: true,
timeoutMs: 5_000,
runId: "run:billing-cooldown-probe-no-fallbacks",
});
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1);
expect(result.payloads?.[0]?.text ?? "").toContain("ok");
});
});
it("treats agent-level fallbacks as configured when defaults have none", async () => { it("treats agent-level fallbacks as configured when defaults have none", async () => {
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => { await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
await writeAuthStore(agentDir, { await writeAuthStore(agentDir, {