mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-01 02:30:23 +00:00
fix: escalate to model fallback after rate-limit profile rotation cap (#58707)
* fix: escalate to model fallback after rate-limit profile rotation cap Per-model rate limits (e.g. Anthropic Sonnet-only quotas) are not relieved by rotating auth profiles — if all profiles share the same model quota, cycling between them loops forever without falling back to the next model in the configured fallbacks chain. Apply the same rotation-cap pattern introduced for overloaded_error (#58348) to rate_limit errors: - Add `rateLimitedProfileRotations` to auth.cooldowns config (default: 1) - After N profile rotations on a rate_limit error, throw FailoverError to trigger cross-provider model fallback - Add `resolveRateLimitProfileRotationLimit` helper following the same pattern as `resolveOverloadProfileRotationLimit` Fixes #58572 * fix: cap prompt-side rate-limit failover (#58707) (thanks @Forgely3D) * fix: restore latest-main gates for #58707 --------- Co-authored-by: Ember (Forgely3D) <ember@forgely.co> Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { applyJobPatch, createJob } from "./service/jobs.js";
|
||||
import { applyJobPatch, createJob, recomputeNextRuns } from "./service/jobs.js";
|
||||
import type { CronServiceState } from "./service/state.js";
|
||||
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
|
||||
import type { CronJob, CronJobPatch } from "./types.js";
|
||||
@@ -538,3 +538,33 @@ describe("createJob delivery defaults", () => {
|
||||
expect(job.delivery).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("recomputeNextRuns", () => {
|
||||
it("backfills missing every anchorMs for legacy loaded jobs", () => {
|
||||
const now = Date.parse("2026-03-01T12:00:00.000Z");
|
||||
const createdAtMs = now - 120_000;
|
||||
const job: CronJob = {
|
||||
id: "legacy-every",
|
||||
name: "legacy-every",
|
||||
enabled: true,
|
||||
createdAtMs,
|
||||
updatedAtMs: createdAtMs,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
};
|
||||
const state = {
|
||||
...createMockState(now),
|
||||
store: { version: 1 as const, jobs: [job] },
|
||||
} as CronServiceState;
|
||||
|
||||
expect(recomputeNextRuns(state)).toBe(true);
|
||||
expect(job.schedule.kind).toBe("every");
|
||||
if (job.schedule.kind === "every") {
|
||||
expect(job.schedule.anchorMs).toBe(createdAtMs);
|
||||
}
|
||||
expect(job.state.nextRunAtMs).toBe(now);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -238,8 +238,12 @@ export function findJobOrThrow(state: CronServiceState, id: string) {
|
||||
return job;
|
||||
}
|
||||
|
||||
export function isJobEnabled(job: Pick<CronJob, "enabled">): boolean {
|
||||
return job.enabled ?? true;
|
||||
}
|
||||
|
||||
export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined {
|
||||
if (!job.enabled) {
|
||||
if (!isJobEnabled(job)) {
|
||||
return undefined;
|
||||
}
|
||||
if (job.schedule.kind === "every") {
|
||||
@@ -295,7 +299,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
|
||||
}
|
||||
|
||||
export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined {
|
||||
if (!job.enabled || job.schedule.kind !== "cron") {
|
||||
if (!isJobEnabled(job) || job.schedule.kind !== "cron") {
|
||||
return undefined;
|
||||
}
|
||||
const previous = computeStaggeredCronPreviousRunAtMs(job, nowMs);
|
||||
@@ -359,7 +363,21 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (!job.enabled) {
|
||||
if (job.schedule.kind === "every") {
|
||||
const normalizedAnchorMs = resolveEveryAnchorMs({
|
||||
schedule: job.schedule,
|
||||
fallbackAnchorMs: isFiniteTimestamp(job.createdAtMs) ? job.createdAtMs : nowMs,
|
||||
});
|
||||
if (job.schedule.anchorMs !== normalizedAnchorMs) {
|
||||
job.schedule = {
|
||||
...job.schedule,
|
||||
anchorMs: normalizedAnchorMs,
|
||||
};
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!isJobEnabled(job)) {
|
||||
if (job.state.nextRunAtMs !== undefined) {
|
||||
job.state.nextRunAtMs = undefined;
|
||||
changed = true;
|
||||
@@ -840,7 +858,9 @@ export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean })
|
||||
if (opts.forced) {
|
||||
return true;
|
||||
}
|
||||
return job.enabled && typeof job.state.nextRunAtMs === "number" && nowMs >= job.state.nextRunAtMs;
|
||||
return (
|
||||
isJobEnabled(job) && typeof job.state.nextRunAtMs === "number" && nowMs >= job.state.nextRunAtMs
|
||||
);
|
||||
}
|
||||
|
||||
export function resolveJobPayloadTextForMain(job: CronJob): string | undefined {
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
computeJobNextRunAtMs,
|
||||
createJob,
|
||||
findJobOrThrow,
|
||||
isJobEnabled,
|
||||
isJobDue,
|
||||
nextWakeAtMs,
|
||||
recomputeNextRuns,
|
||||
@@ -162,7 +163,7 @@ export async function list(state: CronServiceState, opts?: { includeDisabled?: b
|
||||
return await locked(state, async () => {
|
||||
await ensureLoadedForRead(state);
|
||||
const includeDisabled = opts?.includeDisabled === true;
|
||||
const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || j.enabled);
|
||||
const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || isJobEnabled(j));
|
||||
return jobs.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0));
|
||||
});
|
||||
}
|
||||
@@ -215,10 +216,10 @@ export async function listPage(state: CronServiceState, opts?: CronListPageOptio
|
||||
const sortDir = opts?.sortDir ?? "asc";
|
||||
const source = state.store?.jobs ?? [];
|
||||
const filtered = source.filter((job) => {
|
||||
if (enabledFilter === "enabled" && !job.enabled) {
|
||||
if (enabledFilter === "enabled" && !isJobEnabled(job)) {
|
||||
return false;
|
||||
}
|
||||
if (enabledFilter === "disabled" && job.enabled) {
|
||||
if (enabledFilter === "disabled" && isJobEnabled(job)) {
|
||||
return false;
|
||||
}
|
||||
if (!query) {
|
||||
@@ -307,13 +308,13 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
|
||||
|
||||
job.updatedAtMs = now;
|
||||
if (scheduleChanged || enabledChanged) {
|
||||
if (job.enabled) {
|
||||
if (isJobEnabled(job)) {
|
||||
job.state.nextRunAtMs = computeJobNextRunAtMs(job, now);
|
||||
} else {
|
||||
job.state.nextRunAtMs = undefined;
|
||||
job.state.runningAtMs = undefined;
|
||||
}
|
||||
} else if (job.enabled) {
|
||||
} else if (isJobEnabled(job)) {
|
||||
// Non-schedule edits should not mutate other jobs, but still repair a
|
||||
// missing/corrupt nextRunAtMs for the updated job.
|
||||
const nextRun = job.state.nextRunAtMs;
|
||||
|
||||
@@ -20,6 +20,7 @@ import type {
|
||||
import {
|
||||
computeJobPreviousRunAtMs,
|
||||
computeJobNextRunAtMs,
|
||||
isJobEnabled,
|
||||
nextWakeAtMs,
|
||||
recomputeNextRunsForMaintenance,
|
||||
recordScheduleComputeError,
|
||||
@@ -499,7 +500,7 @@ export function applyJobResult(
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if (result.status === "error" && job.enabled) {
|
||||
} else if (result.status === "error" && isJobEnabled(job)) {
|
||||
// Apply exponential backoff for errored jobs to prevent retry storms.
|
||||
const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1);
|
||||
let normalNext: number | undefined;
|
||||
@@ -527,7 +528,7 @@ export function applyJobResult(
|
||||
},
|
||||
"cron: applying error backoff",
|
||||
);
|
||||
} else if (job.enabled) {
|
||||
} else if (isJobEnabled(job)) {
|
||||
let naturalNext: number | undefined;
|
||||
try {
|
||||
naturalNext =
|
||||
@@ -836,7 +837,7 @@ function isRunnableJob(params: {
|
||||
if (!job.state) {
|
||||
job.state = {};
|
||||
}
|
||||
if (!job.enabled) {
|
||||
if (!isJobEnabled(job)) {
|
||||
return false;
|
||||
}
|
||||
if (params.skipJobIds?.has(job.id)) {
|
||||
@@ -853,7 +854,7 @@ function isRunnableJob(params: {
|
||||
const nextRun = job.state.nextRunAtMs;
|
||||
if (
|
||||
job.state.lastStatus === "error" &&
|
||||
job.enabled &&
|
||||
isJobEnabled(job) &&
|
||||
typeof nextRun === "number" &&
|
||||
typeof lastRun === "number" &&
|
||||
nextRun > lastRun
|
||||
@@ -1079,7 +1080,7 @@ async function applyStartupCatchupOutcomes(
|
||||
let offset = staggerMs;
|
||||
for (const jobId of plan.deferredJobIds) {
|
||||
const job = state.store.jobs.find((entry) => entry.id === jobId);
|
||||
if (!job || !job.enabled) {
|
||||
if (!job || !isJobEnabled(job)) {
|
||||
continue;
|
||||
}
|
||||
job.state.nextRunAtMs = baseNow + offset;
|
||||
|
||||
Reference in New Issue
Block a user