mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:10:43 +00:00
fix(gateway): defer missed cron agent startup work
This commit is contained in:
@@ -17,6 +17,8 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Browser/gateway: ignore Playwright dialog-close races from `Page.handleJavaScriptDialog` so browser automation no longer crashes the Gateway when a dialog disappears before Playwright accepts it. (#40067) Thanks @randyjtw.
|
||||
- Cron/Gateway: defer missed isolated agent-turn catch-up out of the channel startup window, so overdue cron work cannot starve Discord or Telegram while providers connect after a restart. Thanks @vincentkoc.
|
||||
- Plugins/runtime-deps: prune stale `openclaw-unknown-*` bundled runtime dependency roots during Gateway startup while keeping recent or locked roots, so old staging debris cannot keep growing across restarts. Thanks @vincentkoc.
|
||||
- Ollama: compose caller abort signals with guarded-fetch timeouts for native `/api/chat` streams, so `/stop` and early cancellation still interrupt local Ollama requests that also carry provider timeout budgets. Refs #74133. Thanks @obviyus.
|
||||
- Doctor/TTS: migrate legacy `messages.tts.enabled`, agent TTS, channel TTS, and voice-call plugin TTS toggles to `auto` mode during `openclaw doctor --fix`, matching the documented TTS config contract. Thanks @vincentkoc.
|
||||
- CLI/logs: fall back to the configured Gateway file log when implicit loopback Gateway connections close or time out before or during `logs.tail`, so `openclaw logs` still works while diagnosing local-model Gateway disconnects. Refs #74078. Thanks @sakalaboator.
|
||||
|
||||
@@ -45,6 +45,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
|
||||
- After the split, older OpenClaw versions can read `jobs.json` but may treat jobs as fresh because runtime fields now live in `jobs-state.json`.
|
||||
- When `jobs.json` is edited while the Gateway is running or stopped, OpenClaw compares the changed schedule fields with pending runtime slot metadata and clears stale `nextRunAtMs` values. Pure formatting or key-order-only rewrites preserve the pending slot.
|
||||
- All cron executions create [background task](/automation/tasks) records.
|
||||
- On Gateway startup, overdue isolated agent-turn jobs are rescheduled out of the channel-connect window instead of replaying immediately, so Discord/Telegram startup and native-command setup stay responsive after restarts.
|
||||
- One-shot jobs (`--at`) auto-delete after success by default.
|
||||
- Isolated cron runs best-effort close tracked browser tabs/processes for their `cron:<jobId>` session when the run completes, so detached browser automation does not leave orphaned processes behind.
|
||||
- Isolated cron runs also guard against stale acknowledgement replies. If the first result is just an interim status update (`on it`, `pulling everything together`, and similar hints) and no descendant subagent run is still responsible for the final answer, OpenClaw re-prompts once for the actual result before delivery.
|
||||
|
||||
@@ -23,15 +23,24 @@ describe("CronService restart catch-up", () => {
|
||||
enqueueSystemEvent: ReturnType<typeof vi.fn>;
|
||||
requestHeartbeatNow: ReturnType<typeof vi.fn>;
|
||||
onEvent?: ReturnType<typeof vi.fn>;
|
||||
nowMs?: () => number;
|
||||
runIsolatedAgentJob?: ReturnType<typeof vi.fn>;
|
||||
startupDeferredMissedAgentJobDelayMs?: number;
|
||||
}) {
|
||||
return new CronService({
|
||||
storePath: params.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
...(params.nowMs ? { nowMs: params.nowMs } : {}),
|
||||
enqueueSystemEvent: params.enqueueSystemEvent as never,
|
||||
requestHeartbeatNow: params.requestHeartbeatNow as never,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })) as never,
|
||||
runIsolatedAgentJob:
|
||||
(params.runIsolatedAgentJob as never) ??
|
||||
(vi.fn(async () => ({ status: "ok" as const })) as never),
|
||||
onEvent: params.onEvent as ((evt: CronEvent) => void) | undefined,
|
||||
...(params.startupDeferredMissedAgentJobDelayMs !== undefined
|
||||
? { startupDeferredMissedAgentJobDelayMs: params.startupDeferredMissedAgentJobDelayMs }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -121,6 +130,55 @@ describe("CronService restart catch-up", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("defers overdue isolated agent-turn jobs during gateway startup", async () => {
|
||||
const store = await makeStorePath();
|
||||
const startNow = Date.parse("2025-12-13T17:00:00.000Z");
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const }));
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
|
||||
await writeStoreJobs(store.storePath, [
|
||||
{
|
||||
id: "startup-isolated-agent",
|
||||
name: "startup isolated agent",
|
||||
enabled: true,
|
||||
createdAtMs: startNow - 120_000,
|
||||
updatedAtMs: startNow - 120_000,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: startNow - 120_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "do work" },
|
||||
state: { nextRunAtMs: startNow - 60_000 },
|
||||
},
|
||||
]);
|
||||
|
||||
const cron = createRestartCronService({
|
||||
storePath: store.storePath,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob,
|
||||
nowMs: () => startNow,
|
||||
startupDeferredMissedAgentJobDelayMs: 120_000,
|
||||
});
|
||||
|
||||
try {
|
||||
await cron.start();
|
||||
|
||||
expect(runIsolatedAgentJob).not.toHaveBeenCalled();
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
|
||||
const listedJobs = await cron.list({ includeDisabled: true });
|
||||
const updated = listedJobs.find((job) => job.id === "startup-isolated-agent");
|
||||
expect(updated?.state.lastStatus).toBeUndefined();
|
||||
expect(updated?.state.runningAtMs).toBeUndefined();
|
||||
expect(updated?.state.nextRunAtMs).toBe(startNow + 120_000);
|
||||
} finally {
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
}
|
||||
});
|
||||
|
||||
it("marks interrupted recurring jobs failed instead of replaying them on startup", async () => {
|
||||
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
|
||||
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
|
||||
|
||||
@@ -167,6 +167,7 @@ export async function start(state: CronServiceState) {
|
||||
|
||||
await runMissedJobs(state, {
|
||||
skipJobIds: interruptedJobIds.size > 0 ? interruptedJobIds : undefined,
|
||||
deferAgentTurnJobs: true,
|
||||
});
|
||||
|
||||
await locked(state, async () => {
|
||||
|
||||
@@ -64,6 +64,11 @@ export type CronServiceDeps = {
|
||||
* See: https://github.com/openclaw/openclaw/issues/18892
|
||||
*/
|
||||
maxMissedJobsPerRestart?: number;
|
||||
/**
|
||||
* Delay before replaying missed agent-turn jobs found during gateway startup.
|
||||
* Keeps model/tool bootstrap work out of the channel connect window.
|
||||
*/
|
||||
startupDeferredMissedAgentJobDelayMs?: number;
|
||||
enqueueSystemEvent: (
|
||||
text: string,
|
||||
opts?: { agentId?: string; sessionKey?: string; contextKey?: string; trusted?: boolean },
|
||||
|
||||
@@ -53,6 +53,7 @@ const MIN_REFIRE_GAP_MS = 2_000;
|
||||
|
||||
const DEFAULT_MISSED_JOB_STAGGER_MS = 5_000;
|
||||
const DEFAULT_MAX_MISSED_JOBS_PER_RESTART = 5;
|
||||
const DEFAULT_STARTUP_DEFERRED_MISSED_AGENT_JOB_DELAY_MS = 2 * 60_000;
|
||||
const DEFAULT_FAILURE_ALERT_AFTER = 2;
|
||||
const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
|
||||
|
||||
@@ -82,9 +83,14 @@ type StartupCatchupCandidate = {
|
||||
job: CronJob;
|
||||
};
|
||||
|
||||
type StartupDeferredJob = {
|
||||
jobId: string;
|
||||
delayMs?: number;
|
||||
};
|
||||
|
||||
type StartupCatchupPlan = {
|
||||
candidates: StartupCatchupCandidate[];
|
||||
deferredJobIds: string[];
|
||||
deferredJobs: StartupDeferredJob[];
|
||||
};
|
||||
|
||||
export async function executeJobCoreWithTimeout(
|
||||
@@ -1038,10 +1044,10 @@ function collectRunnableJobs(
|
||||
|
||||
export async function runMissedJobs(
|
||||
state: CronServiceState,
|
||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||
opts?: { skipJobIds?: ReadonlySet<string>; deferAgentTurnJobs?: boolean },
|
||||
) {
|
||||
const plan = await planStartupCatchup(state, opts);
|
||||
if (plan.candidates.length === 0 && plan.deferredJobIds.length === 0) {
|
||||
if (plan.candidates.length === 0 && plan.deferredJobs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1051,7 +1057,7 @@ export async function runMissedJobs(
|
||||
|
||||
async function planStartupCatchup(
|
||||
state: CronServiceState,
|
||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||
opts?: { skipJobIds?: ReadonlySet<string>; deferAgentTurnJobs?: boolean },
|
||||
): Promise<StartupCatchupPlan> {
|
||||
const maxImmediate = Math.max(
|
||||
0,
|
||||
@@ -1060,7 +1066,7 @@ async function planStartupCatchup(
|
||||
return locked(state, async () => {
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
if (!state.store) {
|
||||
return { candidates: [], deferredJobIds: [] };
|
||||
return { candidates: [], deferredJobs: [] };
|
||||
}
|
||||
|
||||
const now = state.deps.nowMs();
|
||||
@@ -1070,13 +1076,28 @@ async function planStartupCatchup(
|
||||
allowCronMissedRunByLastRun: true,
|
||||
});
|
||||
if (missed.length === 0) {
|
||||
return { candidates: [], deferredJobIds: [] };
|
||||
return { candidates: [], deferredJobs: [] };
|
||||
}
|
||||
const sorted = missed.toSorted(
|
||||
(a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0),
|
||||
);
|
||||
const startupCandidates = sorted.slice(0, maxImmediate);
|
||||
const deferred = sorted.slice(maxImmediate);
|
||||
const deferredAgentJobs = opts?.deferAgentTurnJobs
|
||||
? sorted.filter((job) => job.payload.kind === "agentTurn")
|
||||
: [];
|
||||
const startupEligible = opts?.deferAgentTurnJobs
|
||||
? sorted.filter((job) => job.payload.kind !== "agentTurn")
|
||||
: sorted;
|
||||
const startupCandidates = startupEligible.slice(0, maxImmediate);
|
||||
const deferredOverflow = startupEligible.slice(maxImmediate);
|
||||
const deferredAgentDelayMs = Math.max(
|
||||
0,
|
||||
state.deps.startupDeferredMissedAgentJobDelayMs ??
|
||||
DEFAULT_STARTUP_DEFERRED_MISSED_AGENT_JOB_DELAY_MS,
|
||||
);
|
||||
const deferred: StartupDeferredJob[] = [
|
||||
...deferredOverflow.map((job) => ({ jobId: job.id })),
|
||||
...deferredAgentJobs.map((job) => ({ jobId: job.id, delayMs: deferredAgentDelayMs })),
|
||||
];
|
||||
if (deferred.length > 0) {
|
||||
state.deps.log.info(
|
||||
{
|
||||
@@ -1087,6 +1108,16 @@ async function planStartupCatchup(
|
||||
"cron: staggering missed jobs to prevent gateway overload",
|
||||
);
|
||||
}
|
||||
if (deferredAgentJobs.length > 0) {
|
||||
state.deps.log.info(
|
||||
{
|
||||
count: deferredAgentJobs.length,
|
||||
jobIds: deferredAgentJobs.map((job) => job.id),
|
||||
delayMs: deferredAgentDelayMs,
|
||||
},
|
||||
"cron: deferring missed agent jobs until after gateway startup",
|
||||
);
|
||||
}
|
||||
if (startupCandidates.length > 0) {
|
||||
state.deps.log.info(
|
||||
{ count: startupCandidates.length, jobIds: startupCandidates.map((j) => j.id) },
|
||||
@@ -1101,7 +1132,7 @@ async function planStartupCatchup(
|
||||
|
||||
return {
|
||||
candidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
|
||||
deferredJobIds: deferred.map((job) => job.id),
|
||||
deferredJobs: deferred,
|
||||
};
|
||||
});
|
||||
}
|
||||
@@ -1182,14 +1213,20 @@ async function applyStartupCatchupOutcomes(
|
||||
applyOutcomeToStoredJob(state, result);
|
||||
}
|
||||
|
||||
if (plan.deferredJobIds.length > 0) {
|
||||
if (plan.deferredJobs.length > 0) {
|
||||
const baseNow = state.deps.nowMs();
|
||||
let offset = staggerMs;
|
||||
for (const jobId of plan.deferredJobIds) {
|
||||
for (const deferred of plan.deferredJobs) {
|
||||
const jobId = deferred.jobId;
|
||||
const job = state.store.jobs.find((entry) => entry.id === jobId);
|
||||
if (!job || !isJobEnabled(job)) {
|
||||
continue;
|
||||
}
|
||||
if (typeof deferred.delayMs === "number") {
|
||||
job.state.nextRunAtMs = baseNow + deferred.delayMs + offset - staggerMs;
|
||||
offset += staggerMs;
|
||||
continue;
|
||||
}
|
||||
job.state.nextRunAtMs = baseNow + offset;
|
||||
offset += staggerMs;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveOpenClawPackageRootSync } from "../infra/openclaw-root.js";
|
||||
import {
|
||||
pruneUnknownBundledRuntimeDepsRoots,
|
||||
repairBundledRuntimeDepsInstallRootAsync,
|
||||
resolveBundledRuntimeDependencyPackageInstallRoot,
|
||||
scanBundledPluginRuntimeDeps,
|
||||
@@ -54,6 +55,15 @@ async function prestageGatewayBundledRuntimeDeps(params: {
|
||||
if (!packageRoot) {
|
||||
return;
|
||||
}
|
||||
const pruned = pruneUnknownBundledRuntimeDepsRoots({
|
||||
env: process.env,
|
||||
warn: (message) => params.log.warn(`[plugins] ${message}`),
|
||||
});
|
||||
if (pruned.removed > 0) {
|
||||
params.log.info(
|
||||
`[plugins] pruned stale bundled runtime deps roots (${pruned.removed} removed, ${pruned.skippedLocked} locked, ${pruned.scanned} scanned)`,
|
||||
);
|
||||
}
|
||||
let scanResult: ReturnType<typeof scanBundledPluginRuntimeDeps>;
|
||||
try {
|
||||
scanResult = scanBundledPluginRuntimeDeps({
|
||||
|
||||
@@ -20,6 +20,7 @@ import {
|
||||
installBundledRuntimeDepsAsync,
|
||||
isWritableDirectory,
|
||||
materializeBundledRuntimeMirrorDistFile,
|
||||
pruneUnknownBundledRuntimeDepsRoots,
|
||||
repairBundledRuntimeDepsInstallRootAsync,
|
||||
resolveBundledRuntimeDependencyInstallRoot,
|
||||
resolveBundledRuntimeDependencyInstallRootPlan,
|
||||
@@ -1887,6 +1888,44 @@ describe("ensureBundledPluginRuntimeDeps", () => {
|
||||
expect(path.basename(resolved).startsWith("openclaw-unknown-")).toBe(false);
|
||||
});
|
||||
|
||||
it("prunes stale unknown external runtime roots while keeping newest and locked roots", () => {
|
||||
const stageDir = makeTempDir();
|
||||
const nowMs = Date.parse("2026-04-29T08:00:00.000Z");
|
||||
const makeRoot = (name: string, ageMs: number, locked = false) => {
|
||||
const root = path.join(stageDir, name);
|
||||
fs.mkdirSync(root, { recursive: true });
|
||||
fs.writeFileSync(path.join(root, "marker"), "ok\n");
|
||||
if (locked) {
|
||||
const lockDir = path.join(root, ".openclaw-runtime-deps.lock");
|
||||
fs.mkdirSync(lockDir, { recursive: true });
|
||||
fs.writeFileSync(
|
||||
path.join(lockDir, "owner.json"),
|
||||
JSON.stringify({ pid: process.pid, createdAtMs: nowMs }),
|
||||
);
|
||||
}
|
||||
const mtime = new Date(nowMs - ageMs);
|
||||
fs.utimesSync(root, mtime, mtime);
|
||||
return root;
|
||||
};
|
||||
const newest = makeRoot("openclaw-unknown-newest", 1_000);
|
||||
const stale = makeRoot("openclaw-unknown-stale", 120_000);
|
||||
const locked = makeRoot("openclaw-unknown-locked", 120_000, true);
|
||||
const versioned = makeRoot("openclaw-2026.4.25-versioned", 120_000);
|
||||
|
||||
const result = pruneUnknownBundledRuntimeDepsRoots({
|
||||
env: { OPENCLAW_PLUGIN_STAGE_DIR: stageDir },
|
||||
nowMs,
|
||||
maxRootsToKeep: 1,
|
||||
minAgeMs: 60_000,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ scanned: 3, removed: 1, skippedLocked: 1 });
|
||||
expect(fs.existsSync(newest)).toBe(true);
|
||||
expect(fs.existsSync(stale)).toBe(false);
|
||||
expect(fs.existsSync(locked)).toBe(true);
|
||||
expect(fs.existsSync(versioned)).toBe(true);
|
||||
});
|
||||
|
||||
it("links source-checkout runtime deps from the cache instead of copying them", () => {
|
||||
const packageRoot = makeTempDir();
|
||||
fs.writeFileSync(
|
||||
|
||||
@@ -67,6 +67,8 @@ const BUNDLED_RUNTIME_DEPS_LOCK_WAIT_MS = 100;
|
||||
const BUNDLED_RUNTIME_DEPS_LOCK_TIMEOUT_MS = 5 * 60_000;
|
||||
const BUNDLED_RUNTIME_DEPS_LOCK_STALE_MS = 10 * 60_000;
|
||||
const BUNDLED_RUNTIME_DEPS_OWNERLESS_LOCK_STALE_MS = 30_000;
|
||||
const DEFAULT_UNKNOWN_RUNTIME_DEPS_ROOTS_TO_KEEP = 20;
|
||||
const DEFAULT_UNKNOWN_RUNTIME_DEPS_MIN_AGE_MS = 10 * 60_000;
|
||||
const BUNDLED_RUNTIME_DEPS_INSTALL_PROGRESS_INTERVAL_MS = 5_000;
|
||||
const BUNDLED_RUNTIME_MIRROR_MATERIALIZED_EXTENSIONS = new Set([".cjs", ".js", ".mjs"]);
|
||||
const BUNDLED_EXTENSION_DIST_DIR = "extensions";
|
||||
@@ -1151,6 +1153,71 @@ function resolveBundledRuntimeDepsExternalBaseDirs(env: NodeJS.ProcessEnv): stri
|
||||
return [path.join(resolveStateDir(env, os.homedir), "plugin-runtime-deps")];
|
||||
}
|
||||
|
||||
export function pruneUnknownBundledRuntimeDepsRoots(
|
||||
params: {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
nowMs?: number;
|
||||
maxRootsToKeep?: number;
|
||||
minAgeMs?: number;
|
||||
warn?: (message: string) => void;
|
||||
} = {},
|
||||
): { scanned: number; removed: number; skippedLocked: number } {
|
||||
const env = params.env ?? process.env;
|
||||
const nowMs = params.nowMs ?? Date.now();
|
||||
const maxRootsToKeep = Math.max(
|
||||
0,
|
||||
params.maxRootsToKeep ?? DEFAULT_UNKNOWN_RUNTIME_DEPS_ROOTS_TO_KEEP,
|
||||
);
|
||||
const minAgeMs = Math.max(0, params.minAgeMs ?? DEFAULT_UNKNOWN_RUNTIME_DEPS_MIN_AGE_MS);
|
||||
let scanned = 0;
|
||||
let removed = 0;
|
||||
let skippedLocked = 0;
|
||||
|
||||
for (const baseDir of resolveBundledRuntimeDepsExternalBaseDirs(env)) {
|
||||
let entries: fs.Dirent[];
|
||||
try {
|
||||
entries = fs.readdirSync(baseDir, { withFileTypes: true });
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const unknownRoots = entries
|
||||
.filter((entry) => entry.isDirectory() && entry.name.startsWith("openclaw-unknown-"))
|
||||
.map((entry) => {
|
||||
const root = path.join(baseDir, entry.name);
|
||||
try {
|
||||
return { root, mtimeMs: fs.statSync(root).mtimeMs };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.filter((entry): entry is { root: string; mtimeMs: number } => entry !== null)
|
||||
.toSorted((left, right) => right.mtimeMs - left.mtimeMs);
|
||||
scanned += unknownRoots.length;
|
||||
|
||||
for (const [index, entry] of unknownRoots.entries()) {
|
||||
const ageMs = nowMs - entry.mtimeMs;
|
||||
if (index < maxRootsToKeep && ageMs < minAgeMs) {
|
||||
continue;
|
||||
}
|
||||
const lockDir = path.join(entry.root, BUNDLED_RUNTIME_DEPS_LOCK_DIR);
|
||||
if (fs.existsSync(lockDir) && !removeRuntimeDepsLockIfStale(lockDir, nowMs)) {
|
||||
skippedLocked += 1;
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
fs.rmSync(entry.root, { recursive: true, force: true });
|
||||
removed += 1;
|
||||
} catch (error) {
|
||||
params.warn?.(
|
||||
`failed to remove stale bundled runtime deps root ${entry.root}: ${String(error)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { scanned, removed, skippedLocked };
|
||||
}
|
||||
|
||||
function resolveExternalBundledRuntimeDepsInstallRoot(params: {
|
||||
pluginRoot: string;
|
||||
env: NodeJS.ProcessEnv;
|
||||
|
||||
Reference in New Issue
Block a user