fix(cron): clean up timed out agent runs

This commit is contained in:
Peter Steinberger
2026-04-29 16:53:19 +01:00
parent c1a42dce86
commit 61d53f98d3
16 changed files with 280 additions and 35 deletions

View File

@@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
- Build/Gateway: route restart, shutdown, respawn, diagnostics, command-queue cleanup, and runtime cleanup through one stable gateway lifecycle runtime entry so rebuilt packages do not strand long-running gateways on stale hashed chunks. Carries forward #73964. Thanks @pashpashpash.
- Memory/wiki: keep broad shared-source and generated related-link blocks from turning every page into a search hit, cap noisy backlinks, support all-term searches such as people-routing queries, and prefer readable page body snippets over generated metadata. Thanks @vincentkoc.
- Cron/Gateway: abort and bounded-clean up timed-out isolated agent turns before recording the timeout, so stale cron sessions cannot leave Discord or other chat lanes stuck in `processing` after a timeout. Thanks @vincentkoc.
- Agents/errors: suppress malformed streaming tool-call JSON fragments before they reach chat surfaces while preserving provider request-validation diagnostics. Fixes #59076; keeps #59080 as duplicate coverage. (#59118) Thanks @singleGanghood.
- CLI/models: restore provider-filtered `models list --all --provider <id>` rows for providers without manifest/static catalog coverage, including Anthropic and Amazon Bedrock, while keeping the compatibility fallback off expensive availability and resolver paths. Thanks @shakkernerd.
- CLI/models: move the OpenAI listable catalog into the plugin manifest so `models list --all --provider openai` uses the manifest fast path instead of loading provider runtime normalization hooks. Thanks @shakkernerd.

View File

@@ -51,6 +51,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
- Isolated cron runs also guard against stale acknowledgement replies. If the first result is just an interim status update (`on it`, `pulling everything together`, and similar hints) and no descendant subagent run is still responsible for the final answer, OpenClaw re-prompts once for the actual result before delivery.
- Isolated cron runs prefer structured execution-denial metadata from the embedded run, then fall back to known final summary/output markers such as `SYSTEM_RUN_DENIED` and `INVALID_REQUEST`, so a blocked command is not reported as a green run.
- Isolated cron runs also treat run-level agent failures as job errors even when no reply payload is produced, so model/provider failures increment error counters and trigger failure notifications instead of clearing the job as successful.
- When an isolated agent-turn job reaches `timeoutSeconds`, cron aborts the underlying agent run and gives it a short cleanup window. If the run does not drain, Gateway-owned cleanup force-clears that run's session ownership before cron records the timeout, so queued chat work is not left behind a stale processing session.
<a id="maintenance"></a>

View File

@@ -162,6 +162,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism
- `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides.
- Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer.
- Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck.
- Stuck-session recovery: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` detects long `processing` sessions. Active embedded runs, active reply operations, and active session-lane tasks remain warning-only by default; if diagnostics show no active work for the session, the watchdog releases the affected session lane so queued startup work can drain.
- Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers.<id>.timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout.
- Provider HTTP request timeout: `models.providers.<id>.timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout.

View File

@@ -21,6 +21,7 @@ export {
runEmbeddedPiAgent as runEmbeddedAgent,
} from "./pi-embedded-runner/run.js";
export {
abortAndDrainEmbeddedPiRun,
abortEmbeddedPiRun,
abortEmbeddedPiRun as abortEmbeddedAgentRun,
isEmbeddedPiRunActive,

View File

@@ -2,6 +2,7 @@ import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
__testing,
abortAndDrainEmbeddedPiRun,
abortEmbeddedPiRun,
clearActiveEmbeddedRun,
consumeEmbeddedRunModelSwitch,
@@ -65,6 +66,32 @@ describe("pi-embedded runner run registry", () => {
expect(abortB).toHaveBeenCalledTimes(1);
});
it("force-clears an aborted run that does not drain", async () => {
vi.useFakeTimers();
try {
const abortRun = vi.fn();
setActiveEmbeddedRun("session-stuck", createRunHandle({ abort: abortRun }), "agent:main");
const resultPromise = abortAndDrainEmbeddedPiRun({
sessionId: "session-stuck",
sessionKey: "agent:main",
settleMs: 100,
forceClear: true,
reason: "test_timeout",
});
await vi.advanceTimersByTimeAsync(100);
const result = await resultPromise;
expect(result).toEqual({ aborted: true, drained: false, forceCleared: true });
expect(abortRun).toHaveBeenCalledTimes(1);
expect(isEmbeddedPiRunHandleActive("session-stuck")).toBe(false);
expect(resolveActiveEmbeddedRunHandleSessionId("agent:main")).toBeUndefined();
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("waits for active runs to drain", async () => {
vi.useFakeTimers();
try {

View File

@@ -310,6 +310,29 @@ export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000):
});
}
export type AbortAndDrainEmbeddedPiRunResult = {
aborted: boolean;
drained: boolean;
forceCleared: boolean;
};
export async function abortAndDrainEmbeddedPiRun(params: {
sessionId: string;
sessionKey?: string;
settleMs?: number;
forceClear?: boolean;
reason?: string;
}): Promise<AbortAndDrainEmbeddedPiRunResult> {
const settleMs = params.settleMs ?? 15_000;
const aborted = abortEmbeddedPiRun(params.sessionId);
const drained = aborted ? await waitForEmbeddedPiRunEnd(params.sessionId, settleMs) : false;
const forceCleared =
params.forceClear === true && (!aborted || !drained)
? forceClearEmbeddedPiRun(params.sessionId, params.sessionKey, params.reason)
: false;
return { aborted, drained, forceCleared };
}
function notifyEmbeddedRunEnded(sessionId: string) {
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId);
if (!waiters || waiters.size === 0) {

View File

@@ -1,4 +1,5 @@
export {
abortAndDrainEmbeddedPiRun,
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming,

View File

@@ -9,6 +9,7 @@ export type {
EmbeddedPiRunResult,
} from "./pi-embedded-runner.js";
export {
abortAndDrainEmbeddedPiRun,
abortEmbeddedAgentRun,
abortEmbeddedPiRun,
compactEmbeddedAgentSession,

View File

@@ -10,6 +10,7 @@ import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { resolveCronDeliveryPlan, type CronDeliveryPlan } from "../delivery-plan.js";
import type {
CronAgentExecutionStarted,
CronDeliveryTrace,
CronDeliveryTraceMessageTarget,
CronDeliveryTraceTarget,
@@ -424,7 +425,7 @@ type RunCronAgentTurnParams = {
message: string;
abortSignal?: AbortSignal;
signal?: AbortSignal;
onExecutionStarted?: () => void;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
sessionKey: string;
agentId?: string;
lane?: string;
@@ -1008,7 +1009,7 @@ export async function runCronIsolatedAgentTurn(params: {
message: string;
abortSignal?: AbortSignal;
signal?: AbortSignal;
onExecutionStarted?: () => void;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
sessionKey: string;
agentId?: string;
lane?: string;
@@ -1026,6 +1027,13 @@ export async function runCronIsolatedAgentTurn(params: {
if (!prepared.ok) {
return prepared.result;
}
const notifyExecutionStarted = () =>
params.onExecutionStarted?.({
jobId: params.job.id,
agentId: prepared.context.agentId,
sessionId: prepared.context.runSessionId,
sessionKey: prepared.context.runSessionKey,
});
try {
const { executeCronRun } = await loadCronExecutorRuntime();
@@ -1054,7 +1062,7 @@ export async function runCronIsolatedAgentTurn(params: {
commandBody: prepared.context.commandBody,
persistSessionEntry: prepared.context.persistSessionEntry,
abortSignal,
onExecutionStarted: params.onExecutionStarted,
onExecutionStarted: notifyExecutionStarted,
abortReason,
isAborted,
thinkLevel: prepared.context.thinkLevel,

View File

@@ -7,6 +7,7 @@ import type {
CronJobCreate,
CronJobPatch,
CronMessageChannel,
CronAgentExecutionStarted,
CronRunOutcome,
CronRunStatus,
CronRunTelemetry,
@@ -93,7 +94,7 @@ export type CronServiceDeps = {
job: CronJob;
message: string;
abortSignal?: AbortSignal;
onExecutionStarted?: () => void;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
}) => Promise<
{
summary?: string;
@@ -114,6 +115,11 @@ export type CronServiceDeps = {
} & CronRunOutcome &
CronRunTelemetry
>;
cleanupTimedOutAgentRun?: (params: {
job: CronJob;
timeoutMs: number;
execution?: CronAgentExecutionStarted;
}) => Promise<void>;
sendCronFailureAlert?: (params: {
job: CronJob;
text: string;

View File

@@ -13,7 +13,7 @@ import {
} from "../../../test/helpers/cron/service-regression-fixtures.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import * as schedule from "../schedule.js";
import type { CronJob } from "../types.js";
import type { CronAgentExecutionStarted, CronJob } from "../types.js";
import { computeJobNextRunAtMs } from "./jobs.js";
import { createCronServiceState, type CronEvent } from "./state.js";
import {
@@ -1213,6 +1213,86 @@ describe("cron service timer regressions", () => {
}
});
it("cleans up timed-out isolated runs even when the runner ignores abort", async () => {
vi.useFakeTimers();
try {
const store = timerRegressionFixtures.makeStorePath();
const scheduledAt = Date.parse("2026-02-15T14:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "timeout-cleanup-stuck-run",
name: "timeout cleanup stuck run",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 1 },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
vi.setSystemTime(scheduledAt);
let now = scheduledAt;
const started = createDeferred<void>();
let abortObserved = false;
const cleanupTimedOutAgentRun = vi.fn(async () => {});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
cleanupTimedOutAgentRun,
runIsolatedAgentJob: vi.fn(
async ({
abortSignal,
onExecutionStarted,
}: {
abortSignal?: AbortSignal;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
}) => {
onExecutionStarted?.({
jobId: "timeout-cleanup-stuck-run",
agentId: "main",
sessionId: "cron-run-session",
sessionKey: "agent:main:cron:timeout-cleanup-stuck-run:run:cron-run-session",
});
started.resolve();
abortSignal?.addEventListener(
"abort",
() => {
abortObserved = true;
},
{ once: true },
);
return await new Promise<never>(() => {});
},
),
});
const timerPromise = onTimer(state);
await started.promise;
await vi.advanceTimersByTimeAsync(1_100);
now += 1_100;
await timerPromise;
expect(abortObserved).toBe(true);
expect(cleanupTimedOutAgentRun).toHaveBeenCalledWith({
job: expect.objectContaining({ id: "timeout-cleanup-stuck-run" }),
timeoutMs: 1_000,
execution: {
jobId: "timeout-cleanup-stuck-run",
agentId: "main",
sessionId: "cron-run-session",
sessionKey: "agent:main:cron:timeout-cleanup-stuck-run:run:cron-run-session",
},
});
const job = state.store?.jobs.find((entry) => entry.id === "timeout-cleanup-stuck-run");
expect(job?.state.lastStatus).toBe("error");
expect(job?.state.lastError).toContain("timed out");
} finally {
vi.useRealTimers();
}
});
it("keeps state updates when cron next-run computation throws after a successful run (#30905)", () => {
const startedAt = Date.parse("2026-03-02T12:00:00.000Z");
const endedAt = startedAt + 50;

View File

@@ -17,6 +17,7 @@ import { resolveCronDeliveryPlan } from "../delivery-plan.js";
import { createCronExecutionId } from "../run-id.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronAgentExecutionStarted,
CronDeliveryStatus,
CronDeliveryTrace,
CronJob,
@@ -45,6 +46,7 @@ import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-polic
export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js";
const MAX_TIMER_DELAY_MS = 60_000;
const CRON_TIMEOUT_CLEANUP_GUARD_MS = 20_000;
/**
* Minimum gap between consecutive fires of the same cron job. This is a
@@ -108,31 +110,48 @@ export async function executeJobCoreWithTimeout(
const runAbortController = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
let rejectTimeout: ((reason?: unknown) => void) | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
rejectTimeout = reject;
let activeExecution: CronAgentExecutionStarted | undefined;
const timeoutMarker = Symbol("cron-timeout");
let resolveTimeout: ((value: typeof timeoutMarker) => void) | undefined;
const timeoutPromise = new Promise<typeof timeoutMarker>((resolve) => {
resolveTimeout = resolve;
});
const startTimeout = () => {
if (timeoutId) {
return;
}
timeoutId = setTimeout(() => {
runAbortController.abort(timeoutErrorMessage());
rejectTimeout?.(new Error(timeoutErrorMessage()));
}, jobTimeoutMs);
};
const deferTimeoutUntilExecutionStart =
job.sessionTarget !== "main" && job.payload.kind === "agentTurn";
const startTimeout = () => {
if (!timeoutId) {
timeoutId = setTimeout(() => {
runAbortController.abort(timeoutErrorMessage());
resolveTimeout?.(timeoutMarker);
}, jobTimeoutMs);
}
};
const onExecutionStarted = (info?: CronAgentExecutionStarted) => {
activeExecution = info ?? activeExecution;
startTimeout();
};
const corePromise = executeJobCore(state, job, runAbortController.signal, {
onExecutionStarted: deferTimeoutUntilExecutionStart ? onExecutionStarted : undefined,
});
if (!deferTimeoutUntilExecutionStart) {
startTimeout();
}
void corePromise.catch((err) => {
if (runAbortController.signal.aborted) {
state.deps.log.warn(
{ jobId: job.id, err: String(err) },
"cron: job core rejected after timeout abort",
);
}
});
try {
return await Promise.race([
executeJobCore(state, job, runAbortController.signal, {
onExecutionStarted: deferTimeoutUntilExecutionStart ? startTimeout : undefined,
}),
timeoutPromise,
]);
const first = await Promise.race([corePromise, timeoutPromise]);
if (first !== timeoutMarker) {
return first;
}
await cleanupTimedOutCronAgentRun(state, job, jobTimeoutMs, activeExecution);
return { status: "error", error: timeoutErrorMessage() };
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
@@ -140,6 +159,34 @@ export async function executeJobCoreWithTimeout(
}
}
async function cleanupTimedOutCronAgentRun(
state: CronServiceState,
job: CronJob,
timeoutMs: number,
execution?: CronAgentExecutionStarted,
): Promise<void> {
if (!state.deps.cleanupTimedOutAgentRun) {
return;
}
let settleTimer: NodeJS.Timeout | undefined;
const cleanupPromise = state.deps.cleanupTimedOutAgentRun({ job, timeoutMs, execution });
const settleTimeout = new Promise<void>((resolve) => {
settleTimer = setTimeout(resolve, CRON_TIMEOUT_CLEANUP_GUARD_MS);
});
try {
await Promise.race([cleanupPromise, settleTimeout]);
} catch (err) {
state.deps.log.warn(
{ jobId: job.id, err: String(err) },
"cron: timed-out agent cleanup failed",
);
} finally {
if (settleTimer) {
clearTimeout(settleTimer);
}
}
}
function resolveRunConcurrency(state: CronServiceState): number {
const raw = state.deps.cronConfig?.maxConcurrentRuns;
if (typeof raw !== "number" || !Number.isFinite(raw)) {
@@ -1260,7 +1307,7 @@ export async function executeJobCore(
job: CronJob,
abortSignal?: AbortSignal,
options?: {
onExecutionStarted?: () => void;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
},
): Promise<
CronRunOutcome &
@@ -1418,7 +1465,7 @@ async function executeDetachedCronJob(
abortSignal: AbortSignal | undefined,
resolveAbortError: () => { status: "error"; error: string },
options?: {
onExecutionStarted?: () => void;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
},
): Promise<
CronRunOutcome &

View File

@@ -98,6 +98,13 @@ export type CronRunOutcome = {
sessionKey?: string;
};
export type CronAgentExecutionStarted = {
jobId: string;
agentId?: string;
sessionId?: string;
sessionKey?: string;
};
export type CronFailureAlert = {
after?: number;
channel?: CronMessageChannel;

View File

@@ -1,4 +1,5 @@
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import { abortAndDrainEmbeddedPiRun } from "../agents/pi-embedded.js";
import { cleanupBrowserSessionsForLifecycleEnd } from "../browser-lifecycle-cleanup.js";
import type { CliDeps } from "../cli/deps.types.js";
import { getRuntimeConfig } from "../config/io.js";
@@ -307,6 +308,29 @@ export function buildGatewayCronService(params: {
});
}
},
cleanupTimedOutAgentRun: async ({ job, execution }) => {
if (!execution?.sessionId) {
return;
}
const result = await abortAndDrainEmbeddedPiRun({
sessionId: execution.sessionId,
sessionKey: execution.sessionKey,
settleMs: 15_000,
forceClear: true,
reason: "cron_timeout",
});
cronLogger.warn(
{
jobId: job.id,
sessionId: execution.sessionId,
sessionKey: execution.sessionKey,
aborted: result.aborted,
drained: result.drained,
forceCleared: result.forceCleared,
},
"cron: cleaned up timed-out agent run",
);
},
sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) =>
await sendGatewayCronFailureAlert({
deps: params.deps,

View File

@@ -18,6 +18,23 @@ const mocks = vi.hoisted(() => ({
}));
vi.mock("../agents/pi-embedded-runner/runs.js", () => ({
abortAndDrainEmbeddedPiRun: async (params: {
sessionId: string;
sessionKey?: string;
settleMs?: number;
forceClear?: boolean;
reason?: string;
}) => {
const aborted = mocks.abortEmbeddedPiRun(params.sessionId);
const drained = aborted
? await mocks.waitForEmbeddedPiRunEnd(params.sessionId, params.settleMs)
: false;
const forceCleared =
params.forceClear === true && (!aborted || !drained)
? mocks.forceClearEmbeddedPiRun(params.sessionId, params.sessionKey, params.reason)
: false;
return { aborted, drained, forceCleared };
},
abortEmbeddedPiRun: mocks.abortEmbeddedPiRun,
forceClearEmbeddedPiRun: mocks.forceClearEmbeddedPiRun,
isEmbeddedPiRunActive: mocks.isEmbeddedPiRunActive,

View File

@@ -1,12 +1,10 @@
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js";
import {
abortEmbeddedPiRun,
forceClearEmbeddedPiRun,
abortAndDrainEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunHandleActive,
resolveActiveEmbeddedRunSessionId,
resolveActiveEmbeddedRunHandleSessionId,
waitForEmbeddedPiRunEnd,
} from "../agents/pi-embedded-runner/runs.js";
import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js";
import { diagnosticLogger as diag } from "./diagnostic-runtime.js";
@@ -62,13 +60,15 @@ export async function recoverStuckDiagnosticSession(
);
return;
}
aborted = abortEmbeddedPiRun(activeSessionId);
if (aborted) {
drained = await waitForEmbeddedPiRunEnd(activeSessionId, STUCK_SESSION_ABORT_SETTLE_MS);
}
if (!aborted || !drained) {
forceClearEmbeddedPiRun(activeSessionId, params.sessionKey, "stuck_recovery");
}
const result = await abortAndDrainEmbeddedPiRun({
sessionId: activeSessionId,
sessionKey: params.sessionKey,
settleMs: STUCK_SESSION_ABORT_SETTLE_MS,
forceClear: true,
reason: "stuck_recovery",
});
aborted = result.aborted;
drained = result.drained;
}
if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) {