From bbb73d3171e59408de80f6bd0aaaf52408abacdf Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 4 Apr 2026 14:42:27 +0100 Subject: [PATCH] refactor: split isolated cron runner phases --- src/cron/isolated-agent/run-executor.ts | 348 +++++++ src/cron/isolated-agent/run.ts | 1227 ++++++++++------------- 2 files changed, 876 insertions(+), 699 deletions(-) create mode 100644 src/cron/isolated-agent/run-executor.ts diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts new file mode 100644 index 00000000000..d8ffa3c040d --- /dev/null +++ b/src/cron/isolated-agent/run-executor.ts @@ -0,0 +1,348 @@ +import type { SkillSnapshot } from "../../agents/skills.js"; +import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; +import type { CronJob } from "../types.js"; +import { resolveCronPayloadOutcome } from "./helpers.js"; +import { + countActiveDescendantRuns, + listDescendantRunsForRequester, + LiveSessionModelSwitchError, + getCliSessionId, + isCliProvider, + logWarn, + normalizeVerboseLevel, + registerAgentRunContext, + resolveBootstrapWarningSignaturesSeen, + resolveFastModeState, + resolveNestedAgentLane, + resolveSessionTranscriptPath, + runCliAgent, + runEmbeddedPiAgent, + runWithModelFallback, +} from "./run-execution.runtime.js"; +import { resolveCronFallbacksOverride } from "./run-fallback-policy.js"; +import type { + CronLiveSelection, + MutableCronSession, + PersistCronSessionEntry, +} from "./run-session-state.js"; +import { syncCronSessionLiveSelection } from "./run-session-state.js"; +import { isLikelyInterimCronMessage } from "./subagent-followup-hints.js"; + +type AgentTurnPayload = Extract | null; +type CronPromptRunResult = Awaited>; + +export type CronExecutionResult = { + runResult: CronPromptRunResult; + fallbackProvider: string; + fallbackModel: string; + runStartedAt: number; + runEndedAt: number; + liveSelection: CronLiveSelection; +}; + +export function createCronPromptExecutor(params: { + cfg: OpenClawConfig; + cfgWithAgentDefaults: OpenClawConfig; + job: CronJob; + agentId: string; + agentDir: string; + agentSessionKey: string; + workspaceDir: string; + lane?: string; + resolvedVerboseLevel: VerboseLevel; + thinkLevel: ThinkLevel | undefined; + timeoutMs: number; + messageChannel: string | undefined; + resolvedDelivery: { accountId?: string }; + toolPolicy: { + requireExplicitMessageTarget: boolean; + disableMessageTool: boolean; + }; + skillsSnapshot: SkillSnapshot; + agentPayload: AgentTurnPayload; + liveSelection: CronLiveSelection; + cronSession: MutableCronSession; + abortSignal?: AbortSignal; + abortReason: () => string; +}) { + const sessionFile = resolveSessionTranscriptPath( + params.cronSession.sessionEntry.sessionId, + params.agentId, + ); + const cronFallbacksOverride = resolveCronFallbacksOverride({ + cfg: params.cfg, + job: params.job, + agentId: params.agentId, + }); + let runResult: CronPromptRunResult | undefined; + let fallbackProvider = params.liveSelection.provider; + let fallbackModel = params.liveSelection.model; + let runEndedAt = Date.now(); + let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + params.cronSession.sessionEntry.systemPromptReport, + ); + + const runPrompt = async (promptText: string) => { + const fallbackResult = await runWithModelFallback({ + cfg: params.cfgWithAgentDefaults, + provider: params.liveSelection.provider, + model: params.liveSelection.model, + runId: params.cronSession.sessionEntry.sessionId, + agentDir: params.agentDir, + fallbacksOverride: cronFallbacksOverride, + run: async (providerOverride, modelOverride, runOptions) => { + if (params.abortSignal?.aborted) { + throw new Error(params.abortReason()); + } + const bootstrapPromptWarningSignature = + bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1]; + if (isCliProvider(providerOverride, params.cfgWithAgentDefaults)) { + const cliSessionId = params.cronSession.isNewSession + ? undefined + : getCliSessionId(params.cronSession.sessionEntry, providerOverride); + const result = await runCliAgent({ + sessionId: params.cronSession.sessionEntry.sessionId, + sessionKey: params.agentSessionKey, + agentId: params.agentId, + sessionFile, + workspaceDir: params.workspaceDir, + config: params.cfgWithAgentDefaults, + prompt: promptText, + provider: providerOverride, + model: modelOverride, + thinkLevel: params.thinkLevel, + timeoutMs: params.timeoutMs, + runId: params.cronSession.sessionEntry.sessionId, + cliSessionId, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + return result; + } + const result = await runEmbeddedPiAgent({ + sessionId: params.cronSession.sessionEntry.sessionId, + sessionKey: params.agentSessionKey, + agentId: params.agentId, + trigger: "cron", + allowGatewaySubagentBinding: true, + senderIsOwner: true, + messageChannel: params.messageChannel, + agentAccountId: params.resolvedDelivery.accountId, + sessionFile, + agentDir: params.agentDir, + workspaceDir: params.workspaceDir, + config: params.cfgWithAgentDefaults, + skillsSnapshot: params.skillsSnapshot, + prompt: promptText, + lane: resolveNestedAgentLane(params.lane), + provider: providerOverride, + model: modelOverride, + authProfileId: params.liveSelection.authProfileId, + authProfileIdSource: params.liveSelection.authProfileId + ? params.liveSelection.authProfileIdSource + : undefined, + thinkLevel: params.thinkLevel, + fastMode: resolveFastModeState({ + cfg: params.cfgWithAgentDefaults, + provider: providerOverride, + model: modelOverride, + agentId: params.agentId, + sessionEntry: params.cronSession.sessionEntry, + }).enabled, + verboseLevel: params.resolvedVerboseLevel, + timeoutMs: params.timeoutMs, + bootstrapContextMode: params.agentPayload?.lightContext ? "lightweight" : undefined, + bootstrapContextRunKind: "cron", + toolsAllow: params.agentPayload?.toolsAllow, + runId: params.cronSession.sessionEntry.sessionId, + requireExplicitMessageTarget: params.toolPolicy.requireExplicitMessageTarget, + disableMessageTool: params.toolPolicy.disableMessageTool, + allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, + abortSignal: params.abortSignal, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + return result; + }, + }); + runResult = fallbackResult.result; + fallbackProvider = fallbackResult.provider; + fallbackModel = fallbackResult.model; + params.liveSelection.provider = fallbackResult.provider; + params.liveSelection.model = fallbackResult.model; + runEndedAt = Date.now(); + }; + + return { + runPrompt, + getState: () => ({ + runResult, + fallbackProvider, + fallbackModel, + runEndedAt, + liveSelection: params.liveSelection, + }), + }; +} + +export async function executeCronRun(params: { + cfg: OpenClawConfig; + cfgWithAgentDefaults: OpenClawConfig; + job: CronJob; + agentId: string; + agentDir: string; + agentSessionKey: string; + workspaceDir: string; + lane?: string; + resolvedDelivery: { + channel?: string; + accountId?: string; + }; + toolPolicy: { + requireExplicitMessageTarget: boolean; + disableMessageTool: boolean; + }; + skillsSnapshot: SkillSnapshot; + agentPayload: AgentTurnPayload; + agentVerboseDefault: AgentDefaultsConfig["verboseDefault"]; + liveSelection: CronLiveSelection; + cronSession: MutableCronSession; + commandBody: string; + persistSessionEntry: PersistCronSessionEntry; + abortSignal?: AbortSignal; + abortReason: () => string; + isAborted: () => boolean; + thinkLevel: ThinkLevel | undefined; + timeoutMs: number; + runStartedAt?: number; +}): Promise { + const resolvedVerboseLevel: VerboseLevel = + normalizeVerboseLevel(params.cronSession.sessionEntry.verboseLevel) ?? + normalizeVerboseLevel(params.agentVerboseDefault) ?? + "off"; + registerAgentRunContext(params.cronSession.sessionEntry.sessionId, { + sessionKey: params.agentSessionKey, + verboseLevel: resolvedVerboseLevel, + }); + const executor = createCronPromptExecutor({ + cfg: params.cfg, + cfgWithAgentDefaults: params.cfgWithAgentDefaults, + job: params.job, + agentId: params.agentId, + agentDir: params.agentDir, + agentSessionKey: params.agentSessionKey, + workspaceDir: params.workspaceDir, + lane: params.lane, + resolvedVerboseLevel, + thinkLevel: params.thinkLevel, + timeoutMs: params.timeoutMs, + messageChannel: params.resolvedDelivery.channel, + resolvedDelivery: params.resolvedDelivery, + toolPolicy: params.toolPolicy, + skillsSnapshot: params.skillsSnapshot, + agentPayload: params.agentPayload, + liveSelection: params.liveSelection, + cronSession: params.cronSession, + abortSignal: params.abortSignal, + abortReason: params.abortReason, + }); + + const runStartedAt = params.runStartedAt ?? Date.now(); + const MAX_MODEL_SWITCH_RETRIES = 2; + let modelSwitchRetries = 0; + while (true) { + try { + await executor.runPrompt(params.commandBody); + break; + } catch (err) { + if (!(err instanceof LiveSessionModelSwitchError)) { + throw err; + } + modelSwitchRetries += 1; + if (modelSwitchRetries > MAX_MODEL_SWITCH_RETRIES) { + logWarn( + `[cron:${params.job.id}] LiveSessionModelSwitchError retry limit reached (${MAX_MODEL_SWITCH_RETRIES}); aborting`, + ); + throw err; + } + params.liveSelection.provider = err.provider; + params.liveSelection.model = err.model; + params.liveSelection.authProfileId = err.authProfileId; + params.liveSelection.authProfileIdSource = err.authProfileId + ? err.authProfileIdSource + : undefined; + syncCronSessionLiveSelection({ + entry: params.cronSession.sessionEntry, + liveSelection: params.liveSelection, + }); + try { + await params.persistSessionEntry(); + } catch (persistErr) { + logWarn( + `[cron:${params.job.id}] Failed to persist model switch session entry: ${String(persistErr)}`, + ); + } + continue; + } + } + + let { runResult, fallbackProvider, fallbackModel, runEndedAt } = executor.getState(); + if (!runResult) { + throw new Error("cron isolated run returned no result"); + } + + if (!params.isAborted()) { + const interimPayloads = runResult.payloads ?? []; + const { + deliveryPayloadHasStructuredContent: interimPayloadHasStructuredContent, + outputText: interimOutputText, + } = resolveCronPayloadOutcome({ + payloads: interimPayloads, + runLevelError: runResult.meta?.error, + }); + const interimText = interimOutputText?.trim() ?? ""; + const shouldRetryInterimAck = + !runResult.meta?.error && + !runResult.didSendViaMessagingTool && + !interimPayloadHasStructuredContent && + !interimPayloads.some((payload) => payload?.isError === true) && + !listDescendantRunsForRequester(params.agentSessionKey).some((entry) => { + const descendantStartedAt = + typeof entry.startedAt === "number" ? entry.startedAt : entry.createdAt; + return typeof descendantStartedAt === "number" && descendantStartedAt >= runStartedAt; + }) && + countActiveDescendantRuns(params.agentSessionKey) === 0 && + isLikelyInterimCronMessage(interimText); + + if (shouldRetryInterimAck) { + const continuationPrompt = [ + "Your previous response was only an acknowledgement and did not complete this cron task.", + "Complete the original task now.", + "Do not send a status update like 'on it'.", + "Use tools when needed, including sessions_spawn for parallel subtasks, wait for spawned subagents to finish, then return only the final summary.", + ].join(" "); + await executor.runPrompt(continuationPrompt); + ({ runResult, fallbackProvider, fallbackModel, runEndedAt } = executor.getState()); + } + } + + if (!runResult) { + throw new Error("cron isolated run returned no result"); + } + return { + runResult, + fallbackProvider, + fallbackModel, + runStartedAt, + runEndedAt, + liveSelection: params.liveSelection, + }; +} diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 7853762cfd6..285d6360f00 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -1,5 +1,8 @@ +import type { SkillSnapshot } from "../../agents/skills.js"; +import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { CliDeps } from "../../cli/outbound-send-deps.js"; import type { OpenClawConfig } from "../../config/config.js"; +import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; import { resolveCronDeliveryPlan } from "../delivery-plan.js"; import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { @@ -15,49 +18,45 @@ import { } from "./helpers.js"; import { resolveCronModelSelection } from "./model-selection.js"; import { buildCronAgentDefaultsConfig } from "./run-config.js"; +import { executeCronRun, type CronExecutionResult } from "./run-executor.js"; +import { + createPersistCronSessionEntry, + markCronSessionPreRun, + persistCronSkillsSnapshotIfChanged, + type CronLiveSelection, + type MutableCronSession, + type PersistCronSessionEntry, +} from "./run-session-state.js"; import { DEFAULT_CONTEXT_TOKENS, - LiveSessionModelSwitchError, buildSafeExternalPrompt, - countActiveDescendantRuns, deriveSessionTotalTokens, detectSuspiciousPatterns, ensureAgentWorkspace, hasNonzeroUsage, isCliProvider, isExternalHookSession, - listDescendantRunsForRequester, loadModelCatalog, logWarn, lookupContextTokens, mapHookExternalContentSource, normalizeAgentId, normalizeThinkLevel, - normalizeVerboseLevel, - registerAgentRunContext, resolveAgentConfig, resolveAgentDir, - resolveEffectiveModelFallbacks, resolveAgentTimeoutMs, resolveAgentWorkspaceDir, - resolveBootstrapWarningSignaturesSeen, resolveCronStyleNow, resolveDefaultAgentId, - resolveFastModeState, resolveHookExternalContentSource, - resolveNestedAgentLane, resolveSessionAuthProfileOverride, - resolveSessionTranscriptPath, resolveThinkingDefault, - runEmbeddedPiAgent, - runWithModelFallback, setSessionRuntimeModel, supportsXHighThinking, } from "./run.runtime.js"; import { resolveCronAgentSessionKey } from "./session-key.js"; import { resolveCronSession } from "./session.js"; import { resolveCronSkillsSnapshot } from "./skills-snapshot.js"; -import { isLikelyInterimCronMessage } from "./subagent-followup-hints.js"; let sessionStoreRuntimePromise: | Promise @@ -181,6 +180,482 @@ async function loadUsageFormatRuntime() { return await import("../../utils/usage-format.js"); } +type RunCronAgentTurnParams = { + cfg: OpenClawConfig; + deps: CliDeps; + job: CronJob; + message: string; + abortSignal?: AbortSignal; + signal?: AbortSignal; + sessionKey: string; + agentId?: string; + lane?: string; + deliveryContract?: IsolatedDeliveryContract; +}; + +type WithRunSession = ( + result: Omit, +) => RunCronAgentTurnResult; + +type PreparedCronRunContext = { + input: RunCronAgentTurnParams; + cfgWithAgentDefaults: OpenClawConfig; + agentId: string; + agentCfg: AgentDefaultsConfig; + agentDir: string; + agentSessionKey: string; + runSessionId: string; + runSessionKey: string; + workspaceDir: string; + commandBody: string; + cronSession: MutableCronSession; + persistSessionEntry: PersistCronSessionEntry; + withRunSession: WithRunSession; + agentPayload: Extract | null; + resolvedDelivery: Awaited>; + deliveryRequested: boolean; + toolPolicy: ReturnType; + skillsSnapshot: SkillSnapshot; + liveSelection: CronLiveSelection; + thinkLevel: ThinkLevel | undefined; + timeoutMs: number; +}; + +type CronPreparationResult = + | { ok: true; context: PreparedCronRunContext } + | { ok: false; result: RunCronAgentTurnResult }; + +async function prepareCronRunContext(params: { + input: RunCronAgentTurnParams; + isFastTestEnv: boolean; +}): Promise { + const { input } = params; + const defaultAgentId = resolveDefaultAgentId(input.cfg); + const requestedAgentId = + typeof input.agentId === "string" && input.agentId.trim() + ? input.agentId + : typeof input.job.agentId === "string" && input.job.agentId.trim() + ? input.job.agentId + : undefined; + const normalizedRequested = requestedAgentId ? normalizeAgentId(requestedAgentId) : undefined; + const agentConfigOverride = normalizedRequested + ? resolveAgentConfig(input.cfg, normalizedRequested) + : undefined; + const agentId = normalizedRequested ?? defaultAgentId; + const agentCfg: AgentDefaultsConfig = buildCronAgentDefaultsConfig({ + defaults: input.cfg.agents?.defaults, + agentConfigOverride, + }); + const cfgWithAgentDefaults: OpenClawConfig = { + ...input.cfg, + agents: Object.assign({}, input.cfg.agents, { defaults: agentCfg }), + }; + let catalog: Awaited> | undefined; + const loadCatalog = async () => { + if (!catalog) { + catalog = await loadModelCatalog({ config: cfgWithAgentDefaults }); + } + return catalog; + }; + + const baseSessionKey = (input.sessionKey?.trim() || `cron:${input.job.id}`).trim(); + const agentSessionKey = resolveCronAgentSessionKey({ + sessionKey: baseSessionKey, + agentId, + mainKey: input.cfg.session?.mainKey, + cfg: input.cfg, + }); + const payloadHookExternalContentSource = + input.job.payload.kind === "agentTurn" ? input.job.payload.externalContentSource : undefined; + const hookExternalContentSource = + payloadHookExternalContentSource ?? resolveHookExternalContentSource(baseSessionKey); + + const workspaceDirRaw = resolveAgentWorkspaceDir(input.cfg, agentId); + const agentDir = resolveAgentDir(input.cfg, agentId); + const workspace = await ensureAgentWorkspace({ + dir: workspaceDirRaw, + ensureBootstrapFiles: !agentCfg?.skipBootstrap && !params.isFastTestEnv, + }); + const workspaceDir = workspace.dir; + + const isGmailHook = hookExternalContentSource === "gmail"; + const now = Date.now(); + const cronSession = resolveCronSession({ + cfg: input.cfg, + sessionKey: agentSessionKey, + agentId, + nowMs: now, + forceNew: input.job.sessionTarget === "isolated", + }); + const runSessionId = cronSession.sessionEntry.sessionId; + const runSessionKey = baseSessionKey.startsWith("cron:") + ? `${agentSessionKey}:run:${runSessionId}` + : agentSessionKey; + const persistSessionEntry = createPersistCronSessionEntry({ + isFastTestEnv: params.isFastTestEnv, + cronSession, + agentSessionKey, + runSessionKey, + updateSessionStore: async (storePath, update) => { + const { updateSessionStore } = await loadSessionStoreRuntime(); + await updateSessionStore(storePath, update); + }, + }); + const withRunSession: WithRunSession = (result) => ({ + ...result, + sessionId: runSessionId, + sessionKey: runSessionKey, + }); + if (!cronSession.sessionEntry.label?.trim() && baseSessionKey.startsWith("cron:")) { + const labelSuffix = + typeof input.job.name === "string" && input.job.name.trim() + ? input.job.name.trim() + : input.job.id; + cronSession.sessionEntry.label = `Cron: ${labelSuffix}`; + } + + const resolvedModelSelection = await resolveCronModelSelection({ + cfg: input.cfg, + cfgWithAgentDefaults, + agentConfigOverride, + sessionEntry: cronSession.sessionEntry, + payload: input.job.payload, + isGmailHook, + }); + if (!resolvedModelSelection.ok) { + return { + ok: false, + result: withRunSession({ status: "error", error: resolvedModelSelection.error }), + }; + } + let provider = resolvedModelSelection.provider; + let model = resolvedModelSelection.model; + if (resolvedModelSelection.warning) { + logWarn(resolvedModelSelection.warning); + } + + const hooksGmailThinking = isGmailHook + ? normalizeThinkLevel(input.cfg.hooks?.gmail?.thinking) + : undefined; + const jobThink = normalizeThinkLevel( + (input.job.payload.kind === "agentTurn" ? input.job.payload.thinking : undefined) ?? undefined, + ); + let thinkLevel: ThinkLevel | undefined = jobThink ?? hooksGmailThinking; + if (!thinkLevel) { + thinkLevel = resolveThinkingDefault({ + cfg: cfgWithAgentDefaults, + provider, + model, + catalog: await loadCatalog(), + }); + } + if (thinkLevel === "xhigh" && !supportsXHighThinking(provider, model)) { + logWarn( + `[cron:${input.job.id}] Thinking level "xhigh" is not supported for ${provider}/${model}; downgrading to "high".`, + ); + thinkLevel = "high"; + } + + const timeoutMs = resolveAgentTimeoutMs({ + cfg: cfgWithAgentDefaults, + overrideSeconds: + input.job.payload.kind === "agentTurn" ? input.job.payload.timeoutSeconds : undefined, + }); + const agentPayload = input.job.payload.kind === "agentTurn" ? input.job.payload : null; + const { deliveryRequested, resolvedDelivery, toolPolicy } = await resolveCronDeliveryContext({ + cfg: cfgWithAgentDefaults, + job: input.job, + agentId, + deliveryContract: input.deliveryContract ?? "cron-owned", + }); + + const { formattedTime, timeLine } = resolveCronStyleNow(input.cfg, now); + const base = `[cron:${input.job.id} ${input.job.name}] ${input.message}`.trim(); + const isExternalHook = + hookExternalContentSource !== undefined || isExternalHookSession(baseSessionKey); + const allowUnsafeExternalContent = + agentPayload?.allowUnsafeExternalContent === true || + (isGmailHook && input.cfg.hooks?.gmail?.allowUnsafeExternalContent === true); + const shouldWrapExternal = isExternalHook && !allowUnsafeExternalContent; + let commandBody: string; + + if (isExternalHook) { + const suspiciousPatterns = detectSuspiciousPatterns(input.message); + if (suspiciousPatterns.length > 0) { + logWarn( + `[security] Suspicious patterns detected in external hook content ` + + `(session=${baseSessionKey}, patterns=${suspiciousPatterns.length}): ${suspiciousPatterns.slice(0, 3).join(", ")}`, + ); + } + } + + if (shouldWrapExternal) { + const hookType = mapHookExternalContentSource(hookExternalContentSource ?? "webhook"); + const safeContent = buildSafeExternalPrompt({ + content: input.message, + source: hookType, + jobName: input.job.name, + jobId: input.job.id, + timestamp: formattedTime, + }); + commandBody = `${safeContent}\n\n${timeLine}`.trim(); + } else { + commandBody = `${base}\n${timeLine}`.trim(); + } + commandBody = appendCronDeliveryInstruction({ commandBody, deliveryRequested }); + + const skillsSnapshot = resolveCronSkillsSnapshot({ + workspaceDir, + config: cfgWithAgentDefaults, + agentId, + existingSnapshot: cronSession.sessionEntry.skillsSnapshot, + isFastTestEnv: params.isFastTestEnv, + }); + await persistCronSkillsSnapshotIfChanged({ + isFastTestEnv: params.isFastTestEnv, + cronSession, + skillsSnapshot, + nowMs: Date.now(), + persistSessionEntry, + }); + + markCronSessionPreRun({ entry: cronSession.sessionEntry, provider, model }); + try { + await persistSessionEntry(); + } catch (err) { + logWarn(`[cron:${input.job.id}] Failed to persist pre-run session entry: ${String(err)}`); + } + + const authProfileId = await resolveSessionAuthProfileOverride({ + cfg: cfgWithAgentDefaults, + provider, + agentDir, + sessionEntry: cronSession.sessionEntry, + sessionStore: cronSession.store, + sessionKey: agentSessionKey, + storePath: cronSession.storePath, + isNewSession: cronSession.isNewSession, + }); + const liveSelection: CronLiveSelection = { + provider, + model, + authProfileId, + authProfileIdSource: authProfileId + ? cronSession.sessionEntry.authProfileOverrideSource + : undefined, + }; + + return { + ok: true, + context: { + input, + cfgWithAgentDefaults, + agentId, + agentCfg, + agentDir, + agentSessionKey, + runSessionId, + runSessionKey, + workspaceDir, + commandBody, + cronSession, + persistSessionEntry, + withRunSession, + agentPayload, + resolvedDelivery, + deliveryRequested, + toolPolicy, + skillsSnapshot, + liveSelection, + thinkLevel, + timeoutMs, + }, + }; +} + +async function finalizeCronRun(params: { + prepared: PreparedCronRunContext; + execution: CronExecutionResult; + abortReason: () => string; + isAborted: () => boolean; +}): Promise { + const { prepared, execution } = params; + const finalRunResult = execution.runResult; + const payloads = finalRunResult.payloads ?? []; + let telemetry: CronRunTelemetry | undefined; + + if (finalRunResult.meta?.systemPromptReport) { + prepared.cronSession.sessionEntry.systemPromptReport = finalRunResult.meta.systemPromptReport; + } + const usage = finalRunResult.meta?.agentMeta?.usage; + const promptTokens = finalRunResult.meta?.agentMeta?.promptTokens; + const modelUsed = + finalRunResult.meta?.agentMeta?.model ?? + execution.fallbackModel ?? + execution.liveSelection.model; + const providerUsed = + finalRunResult.meta?.agentMeta?.provider ?? + execution.fallbackProvider ?? + execution.liveSelection.provider; + const contextTokens = + resolvePositiveContextTokens(prepared.agentCfg?.contextTokens) ?? + lookupContextTokens(modelUsed, { allowAsyncLoad: false }) ?? + resolvePositiveContextTokens(prepared.cronSession.sessionEntry.contextTokens) ?? + DEFAULT_CONTEXT_TOKENS; + + setSessionRuntimeModel(prepared.cronSession.sessionEntry, { + provider: providerUsed, + model: modelUsed, + }); + prepared.cronSession.sessionEntry.contextTokens = contextTokens; + if (isCliProvider(providerUsed, prepared.cfgWithAgentDefaults)) { + const cliSessionId = finalRunResult.meta?.agentMeta?.sessionId?.trim(); + if (cliSessionId) { + const { setCliSessionId } = await loadCliRunnerRuntime(); + setCliSessionId(prepared.cronSession.sessionEntry, providerUsed, cliSessionId); + } + } + if (hasNonzeroUsage(usage)) { + const { estimateUsageCost, resolveModelCostConfig } = await loadUsageFormatRuntime(); + const input = usage.input ?? 0; + const output = usage.output ?? 0; + const totalTokens = deriveSessionTotalTokens({ + usage, + contextTokens, + promptTokens, + }); + const runEstimatedCostUsd = resolveNonNegativeNumber( + estimateUsageCost({ + usage, + cost: resolveModelCostConfig({ + provider: providerUsed, + model: modelUsed, + config: prepared.cfgWithAgentDefaults, + }), + }), + ); + prepared.cronSession.sessionEntry.inputTokens = input; + prepared.cronSession.sessionEntry.outputTokens = output; + const telemetryUsage: NonNullable = { + input_tokens: input, + output_tokens: output, + }; + if (typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0) { + prepared.cronSession.sessionEntry.totalTokens = totalTokens; + prepared.cronSession.sessionEntry.totalTokensFresh = true; + telemetryUsage.total_tokens = totalTokens; + } else { + prepared.cronSession.sessionEntry.totalTokens = undefined; + prepared.cronSession.sessionEntry.totalTokensFresh = false; + } + prepared.cronSession.sessionEntry.cacheRead = usage.cacheRead ?? 0; + prepared.cronSession.sessionEntry.cacheWrite = usage.cacheWrite ?? 0; + if (runEstimatedCostUsd !== undefined) { + prepared.cronSession.sessionEntry.estimatedCostUsd = + (resolveNonNegativeNumber(prepared.cronSession.sessionEntry.estimatedCostUsd) ?? 0) + + runEstimatedCostUsd; + } + telemetry = { + model: modelUsed, + provider: providerUsed, + usage: telemetryUsage, + }; + } else { + telemetry = { model: modelUsed, provider: providerUsed }; + } + await prepared.persistSessionEntry(); + + if (params.isAborted()) { + return prepared.withRunSession({ status: "error", error: params.abortReason(), ...telemetry }); + } + let { + summary, + outputText, + synthesizedText, + deliveryPayloads, + deliveryPayloadHasStructuredContent, + hasFatalErrorPayload, + embeddedRunError, + } = resolveCronPayloadOutcome({ + payloads, + runLevelError: finalRunResult.meta?.error, + }); + const resolveRunOutcome = (result?: { delivered?: boolean; deliveryAttempted?: boolean }) => + prepared.withRunSession({ + status: hasFatalErrorPayload ? "error" : "ok", + ...(hasFatalErrorPayload + ? { error: embeddedRunError ?? "cron isolated run returned an error payload" } + : {}), + summary, + outputText, + delivered: result?.delivered, + deliveryAttempted: result?.deliveryAttempted, + ...telemetry, + }); + + const skipHeartbeatDelivery = + prepared.deliveryRequested && + isHeartbeatOnlyResponse(payloads, resolveHeartbeatAckMaxChars(prepared.agentCfg)); + const skipMessagingToolDelivery = + (prepared.input.deliveryContract ?? "cron-owned") === "shared" && + prepared.deliveryRequested && + finalRunResult.didSendViaMessagingTool === true && + (finalRunResult.messagingToolSentTargets ?? []).some((target) => + matchesMessagingToolDeliveryTarget(target, { + channel: prepared.resolvedDelivery.channel, + to: prepared.resolvedDelivery.to, + accountId: prepared.resolvedDelivery.accountId, + }), + ); + const deliveryResult = await dispatchCronDelivery({ + cfg: prepared.input.cfg, + cfgWithAgentDefaults: prepared.cfgWithAgentDefaults, + deps: prepared.input.deps, + job: prepared.input.job, + agentId: prepared.agentId, + agentSessionKey: prepared.agentSessionKey, + runSessionId: prepared.runSessionId, + runStartedAt: execution.runStartedAt, + runEndedAt: execution.runEndedAt, + timeoutMs: prepared.timeoutMs, + resolvedDelivery: prepared.resolvedDelivery, + deliveryRequested: prepared.deliveryRequested, + skipHeartbeatDelivery, + skipMessagingToolDelivery, + deliveryBestEffort: resolveCronDeliveryBestEffort(prepared.input.job), + deliveryPayloadHasStructuredContent, + deliveryPayloads, + synthesizedText, + summary, + outputText, + telemetry, + abortSignal: prepared.input.abortSignal ?? prepared.input.signal, + isAborted: params.isAborted, + abortReason: params.abortReason, + withRunSession: prepared.withRunSession, + }); + if (deliveryResult.result) { + const resultWithDeliveryMeta: RunCronAgentTurnResult = { + ...deliveryResult.result, + deliveryAttempted: + deliveryResult.result.deliveryAttempted ?? deliveryResult.deliveryAttempted, + }; + if (!hasFatalErrorPayload || deliveryResult.result.status !== "ok") { + return resultWithDeliveryMeta; + } + return resolveRunOutcome({ + delivered: deliveryResult.result.delivered, + deliveryAttempted: resultWithDeliveryMeta.deliveryAttempted, + }); + } + summary = deliveryResult.summary; + outputText = deliveryResult.outputText; + return resolveRunOutcome({ + delivered: deliveryResult.delivered, + deliveryAttempted: deliveryResult.deliveryAttempted, + }); +} + export async function runCronIsolatedAgentTurn(params: { cfg: OpenClawConfig; deps: CliDeps; @@ -202,695 +677,49 @@ export async function runCronIsolatedAgentTurn(params: { : "cron: job execution timed out"; }; const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1"; - const deliveryContract = params.deliveryContract ?? "cron-owned"; - const defaultAgentId = resolveDefaultAgentId(params.cfg); - const requestedAgentId = - typeof params.agentId === "string" && params.agentId.trim() - ? params.agentId - : typeof params.job.agentId === "string" && params.job.agentId.trim() - ? params.job.agentId - : undefined; - const normalizedRequested = requestedAgentId ? normalizeAgentId(requestedAgentId) : undefined; - const agentConfigOverride = normalizedRequested - ? resolveAgentConfig(params.cfg, normalizedRequested) - : undefined; - // Use the requested agentId even when there is no explicit agent config entry. - // This ensures auth-profiles, workspace, and agentDir all resolve to the - // correct per-agent paths (e.g. ~/.openclaw/agents//agent/). - const agentId = normalizedRequested ?? defaultAgentId; - const agentCfg = buildCronAgentDefaultsConfig({ - defaults: params.cfg.agents?.defaults, - agentConfigOverride, - }); - const cfgWithAgentDefaults: OpenClawConfig = { - ...params.cfg, - agents: Object.assign({}, params.cfg.agents, { defaults: agentCfg }), - }; - let catalog: Awaited> | undefined; - const loadCatalog = async () => { - if (!catalog) { - catalog = await loadModelCatalog({ config: cfgWithAgentDefaults }); - } - return catalog; - }; - - const baseSessionKey = (params.sessionKey?.trim() || `cron:${params.job.id}`).trim(); - const agentSessionKey = resolveCronAgentSessionKey({ - sessionKey: baseSessionKey, - agentId, - mainKey: params.cfg.session?.mainKey, - cfg: params.cfg, - }); - const payloadHookExternalContentSource = - params.job.payload.kind === "agentTurn" ? params.job.payload.externalContentSource : undefined; - const hookExternalContentSource = - payloadHookExternalContentSource ?? resolveHookExternalContentSource(baseSessionKey); - - const workspaceDirRaw = resolveAgentWorkspaceDir(params.cfg, agentId); - const agentDir = resolveAgentDir(params.cfg, agentId); - const workspace = await ensureAgentWorkspace({ - dir: workspaceDirRaw, - ensureBootstrapFiles: !agentCfg?.skipBootstrap && !isFastTestEnv, - }); - const workspaceDir = workspace.dir; - - // Resolve model - prefer hooks.gmail.model for Gmail hooks. - const isGmailHook = hookExternalContentSource === "gmail"; - const now = Date.now(); - const cronSession = resolveCronSession({ - cfg: params.cfg, - sessionKey: agentSessionKey, - agentId, - nowMs: now, - // Isolated cron runs must not carry prior turn context across executions. - forceNew: params.job.sessionTarget === "isolated", - }); - const runSessionId = cronSession.sessionEntry.sessionId; - const runSessionKey = baseSessionKey.startsWith("cron:") - ? `${agentSessionKey}:run:${runSessionId}` - : agentSessionKey; - const persistSessionEntry = async () => { - if (isFastTestEnv) { - return; - } - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - if (runSessionKey !== agentSessionKey) { - cronSession.store[runSessionKey] = cronSession.sessionEntry; - } - const { updateSessionStore } = await loadSessionStoreRuntime(); - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - if (runSessionKey !== agentSessionKey) { - store[runSessionKey] = cronSession.sessionEntry; - } - }); - }; - const withRunSession = ( - result: Omit, - ): RunCronAgentTurnResult => ({ - ...result, - sessionId: runSessionId, - sessionKey: runSessionKey, - }); - if (!cronSession.sessionEntry.label?.trim() && baseSessionKey.startsWith("cron:")) { - const labelSuffix = - typeof params.job.name === "string" && params.job.name.trim() - ? params.job.name.trim() - : params.job.id; - cronSession.sessionEntry.label = `Cron: ${labelSuffix}`; + const prepared = await prepareCronRunContext({ input: params, isFastTestEnv }); + if (!prepared.ok) { + return prepared.result; } - const resolvedModelSelection = await resolveCronModelSelection({ - cfg: params.cfg, - cfgWithAgentDefaults, - agentConfigOverride, - sessionEntry: cronSession.sessionEntry, - payload: params.job.payload, - isGmailHook, - }); - if (!resolvedModelSelection.ok) { - return { status: "error", error: resolvedModelSelection.error }; - } - let provider = resolvedModelSelection.provider; - let model = resolvedModelSelection.model; - if (resolvedModelSelection.warning) { - logWarn(resolvedModelSelection.warning); - } - - // Resolve thinking level - job thinking > hooks.gmail.thinking > model/global defaults - const hooksGmailThinking = isGmailHook - ? normalizeThinkLevel(params.cfg.hooks?.gmail?.thinking) - : undefined; - const jobThink = normalizeThinkLevel( - (params.job.payload.kind === "agentTurn" ? params.job.payload.thinking : undefined) ?? - undefined, - ); - let thinkLevel = jobThink ?? hooksGmailThinking; - if (!thinkLevel) { - thinkLevel = resolveThinkingDefault({ - cfg: cfgWithAgentDefaults, - provider, - model, - catalog: await loadCatalog(), - }); - } - if (thinkLevel === "xhigh" && !supportsXHighThinking(provider, model)) { - logWarn( - `[cron:${params.job.id}] Thinking level "xhigh" is not supported for ${provider}/${model}; downgrading to "high".`, - ); - thinkLevel = "high"; - } - - const timeoutMs = resolveAgentTimeoutMs({ - cfg: cfgWithAgentDefaults, - overrideSeconds: - params.job.payload.kind === "agentTurn" ? params.job.payload.timeoutSeconds : undefined, - }); - - const agentPayload = params.job.payload.kind === "agentTurn" ? params.job.payload : null; - const { deliveryRequested, resolvedDelivery, toolPolicy } = await resolveCronDeliveryContext({ - cfg: cfgWithAgentDefaults, - job: params.job, - agentId, - deliveryContract, - }); - - const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now); - const base = `[cron:${params.job.id} ${params.job.name}] ${params.message}`.trim(); - - // SECURITY: Wrap external hook content with security boundaries to prevent prompt injection - // unless explicitly allowed via a dangerous config override. - const isExternalHook = - hookExternalContentSource !== undefined || isExternalHookSession(baseSessionKey); - const allowUnsafeExternalContent = - agentPayload?.allowUnsafeExternalContent === true || - (isGmailHook && params.cfg.hooks?.gmail?.allowUnsafeExternalContent === true); - const shouldWrapExternal = isExternalHook && !allowUnsafeExternalContent; - let commandBody: string; - - if (isExternalHook) { - // Log suspicious patterns for security monitoring - const suspiciousPatterns = detectSuspiciousPatterns(params.message); - if (suspiciousPatterns.length > 0) { - logWarn( - `[security] Suspicious patterns detected in external hook content ` + - `(session=${baseSessionKey}, patterns=${suspiciousPatterns.length}): ${suspiciousPatterns.slice(0, 3).join(", ")}`, - ); - } - } - - if (shouldWrapExternal) { - // Wrap external content with security boundaries - const hookType = mapHookExternalContentSource(hookExternalContentSource ?? "webhook"); - const safeContent = buildSafeExternalPrompt({ - content: params.message, - source: hookType, - jobName: params.job.name, - jobId: params.job.id, - timestamp: formattedTime, - }); - - commandBody = `${safeContent}\n\n${timeLine}`.trim(); - } else { - // Internal/trusted source - use original format - commandBody = `${base}\n${timeLine}`.trim(); - } - commandBody = appendCronDeliveryInstruction({ commandBody, deliveryRequested }); - - const existingSkillsSnapshot = cronSession.sessionEntry.skillsSnapshot; - const skillsSnapshot = resolveCronSkillsSnapshot({ - workspaceDir, - config: cfgWithAgentDefaults, - agentId, - existingSnapshot: existingSkillsSnapshot, - isFastTestEnv, - }); - if (!isFastTestEnv && skillsSnapshot !== existingSkillsSnapshot) { - cronSession.sessionEntry = { - ...cronSession.sessionEntry, - updatedAt: Date.now(), - skillsSnapshot, - }; - await persistSessionEntry(); - } - - // Persist the intended model and systemSent before the run so that - // sessions_list reflects the cron override even if the run fails or is - // still in progress (#21057). Best-effort: a filesystem error here - // must not prevent the actual agent run from executing. - cronSession.sessionEntry.modelProvider = provider; - cronSession.sessionEntry.model = model; - cronSession.sessionEntry.systemSent = true; try { - await persistSessionEntry(); + const execution = await executeCronRun({ + cfg: params.cfg, + cfgWithAgentDefaults: prepared.context.cfgWithAgentDefaults, + job: params.job, + agentId: prepared.context.agentId, + agentDir: prepared.context.agentDir, + agentSessionKey: prepared.context.agentSessionKey, + workspaceDir: prepared.context.workspaceDir, + lane: params.lane, + resolvedDelivery: { + channel: prepared.context.resolvedDelivery.channel, + accountId: prepared.context.resolvedDelivery.accountId, + }, + toolPolicy: prepared.context.toolPolicy, + skillsSnapshot: prepared.context.skillsSnapshot, + agentPayload: prepared.context.agentPayload, + agentVerboseDefault: prepared.context.agentCfg?.verboseDefault, + liveSelection: prepared.context.liveSelection, + cronSession: prepared.context.cronSession, + commandBody: prepared.context.commandBody, + persistSessionEntry: prepared.context.persistSessionEntry, + abortSignal, + abortReason, + isAborted, + thinkLevel: prepared.context.thinkLevel, + timeoutMs: prepared.context.timeoutMs, + }); + if (isAborted()) { + return prepared.context.withRunSession({ status: "error", error: abortReason() }); + } + return await finalizeCronRun({ + prepared: prepared.context, + execution, + abortReason, + isAborted, + }); } catch (err) { - logWarn(`[cron:${params.job.id}] Failed to persist pre-run session entry: ${String(err)}`); + return prepared.context.withRunSession({ status: "error", error: String(err) }); } - - // Resolve auth profile for the session, mirroring the inbound auto-reply path - // (get-reply-run.ts). Without this, isolated cron sessions fall back to env-var - // auth which may not match the configured auth-profiles, causing 401 errors. - const authProfileId = await resolveSessionAuthProfileOverride({ - cfg: cfgWithAgentDefaults, - provider, - agentDir, - sessionEntry: cronSession.sessionEntry, - sessionStore: cronSession.store, - sessionKey: agentSessionKey, - storePath: cronSession.storePath, - isNewSession: cronSession.isNewSession, - }); - let liveSelection = { - provider, - model, - authProfileId, - authProfileIdSource: authProfileId - ? cronSession.sessionEntry.authProfileOverrideSource - : undefined, - }; - const syncSessionEntryLiveSelection = () => { - cronSession.sessionEntry.modelProvider = liveSelection.provider; - cronSession.sessionEntry.model = liveSelection.model; - if (liveSelection.authProfileId) { - cronSession.sessionEntry.authProfileOverride = liveSelection.authProfileId; - cronSession.sessionEntry.authProfileOverrideSource = liveSelection.authProfileIdSource; - if (liveSelection.authProfileIdSource === "auto") { - cronSession.sessionEntry.authProfileOverrideCompactionCount = - cronSession.sessionEntry.compactionCount ?? 0; - } else { - delete cronSession.sessionEntry.authProfileOverrideCompactionCount; - } - return; - } - delete cronSession.sessionEntry.authProfileOverride; - delete cronSession.sessionEntry.authProfileOverrideSource; - delete cronSession.sessionEntry.authProfileOverrideCompactionCount; - }; - - let runResult: Awaited> | undefined; - let fallbackProvider = liveSelection.provider; - let fallbackModel = liveSelection.model; - const runStartedAt = Date.now(); - let runEndedAt = runStartedAt; - try { - const sessionFile = resolveSessionTranscriptPath(cronSession.sessionEntry.sessionId, agentId); - const resolvedVerboseLevel = - normalizeVerboseLevel(cronSession.sessionEntry.verboseLevel) ?? - normalizeVerboseLevel(agentCfg?.verboseDefault) ?? - "off"; - registerAgentRunContext(cronSession.sessionEntry.sessionId, { - sessionKey: agentSessionKey, - verboseLevel: resolvedVerboseLevel, - }); - const messageChannel = resolvedDelivery.channel; - // Per-job payload.fallbacks takes priority over agent-level fallbacks. - const payloadFallbacks = - params.job.payload.kind === "agentTurn" && Array.isArray(params.job.payload.fallbacks) - ? params.job.payload.fallbacks - : undefined; - // Keep cron fallback semantics aligned with the shared agent path: - // explicit model overrides inherit configured fallback chains, but they - // must not silently append the agent primary as a last-resort candidate. - // See: https://github.com/openclaw/openclaw/issues/58065 - const hasCronPayloadModelOverride = - params.job.payload.kind === "agentTurn" && - typeof params.job.payload.model === "string" && - params.job.payload.model.trim().length > 0; - const cronFallbacksOverride = - payloadFallbacks ?? - resolveEffectiveModelFallbacks({ - cfg: params.cfg, - agentId, - hasSessionModelOverride: hasCronPayloadModelOverride, - }); - let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - cronSession.sessionEntry.systemPromptReport, - ); - - const runPrompt = async (promptText: string) => { - const fallbackResult = await runWithModelFallback({ - cfg: cfgWithAgentDefaults, - provider: liveSelection.provider, - model: liveSelection.model, - runId: cronSession.sessionEntry.sessionId, - agentDir, - fallbacksOverride: cronFallbacksOverride, - run: async (providerOverride, modelOverride, runOptions) => { - if (abortSignal?.aborted) { - throw new Error(abortReason()); - } - const bootstrapPromptWarningSignature = - bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1]; - if (isCliProvider(providerOverride, cfgWithAgentDefaults)) { - const { getCliSessionId, runCliAgent } = await loadCliRunnerRuntime(); - // Fresh isolated cron sessions must not reuse a stored CLI session ID. - // Passing an existing ID activates the resume watchdog profile - // (noOutputTimeoutRatio 0.3, maxMs 180 s) instead of the fresh profile - // (ratio 0.8, maxMs 600 s), causing jobs to time out at roughly 1/3 of - // the configured timeoutSeconds. See: https://github.com/openclaw/openclaw/issues/29774 - const cliSessionId = cronSession.isNewSession - ? undefined - : getCliSessionId(cronSession.sessionEntry, providerOverride); - const result = await runCliAgent({ - sessionId: cronSession.sessionEntry.sessionId, - sessionKey: agentSessionKey, - agentId, - sessionFile, - workspaceDir, - config: cfgWithAgentDefaults, - prompt: promptText, - provider: providerOverride, - model: modelOverride, - thinkLevel, - timeoutMs, - runId: cronSession.sessionEntry.sessionId, - cliSessionId, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - return result; - } - const result = await runEmbeddedPiAgent({ - sessionId: cronSession.sessionEntry.sessionId, - sessionKey: agentSessionKey, - agentId, - trigger: "cron", - // Cron runs execute inside the gateway process and need the same - // explicit subagent late-binding as other gateway-owned runners. - allowGatewaySubagentBinding: true, - // Cron jobs are trusted local automation, so isolated runs should - // inherit owner-only tooling like local `openclaw agent` runs. - senderIsOwner: true, - messageChannel, - agentAccountId: resolvedDelivery.accountId, - sessionFile, - agentDir, - workspaceDir, - config: cfgWithAgentDefaults, - skillsSnapshot, - prompt: promptText, - lane: resolveNestedAgentLane(params.lane), - provider: providerOverride, - model: modelOverride, - authProfileId: liveSelection.authProfileId, - authProfileIdSource: liveSelection.authProfileId - ? liveSelection.authProfileIdSource - : undefined, - thinkLevel, - fastMode: resolveFastModeState({ - cfg: cfgWithAgentDefaults, - provider: providerOverride, - model: modelOverride, - agentId, - sessionEntry: cronSession.sessionEntry, - }).enabled, - verboseLevel: resolvedVerboseLevel, - timeoutMs, - bootstrapContextMode: agentPayload?.lightContext ? "lightweight" : undefined, - bootstrapContextRunKind: "cron", - toolsAllow: agentPayload?.toolsAllow, - runId: cronSession.sessionEntry.sessionId, - requireExplicitMessageTarget: toolPolicy.requireExplicitMessageTarget, - disableMessageTool: toolPolicy.disableMessageTool, - allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, - abortSignal, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - return result; - }, - }); - runResult = fallbackResult.result; - fallbackProvider = fallbackResult.provider; - fallbackModel = fallbackResult.model; - liveSelection.provider = fallbackResult.provider; - liveSelection.model = fallbackResult.model; - runEndedAt = Date.now(); - }; - - // Retry loop: if the isolated session starts with the wrong model (e.g. the - // gateway default) and the runner detects a LiveSessionModelSwitchError, we - // restart with the model requested by the error — mirroring the retry logic - // in the main agent runner (agent-runner-execution.ts). Without this, cron - // jobs that specify a model different from the agent primary always fail. - // See: https://github.com/openclaw/openclaw/issues/57206 - // - // Circuit breaker: cap retries to prevent infinite loops when the live - // session model switch guard fires repeatedly during failover (#58466). - const MAX_MODEL_SWITCH_RETRIES = 2; - let modelSwitchRetries = 0; - while (true) { - try { - await runPrompt(commandBody); - break; - } catch (err) { - if (err instanceof LiveSessionModelSwitchError) { - modelSwitchRetries += 1; - if (modelSwitchRetries > MAX_MODEL_SWITCH_RETRIES) { - logWarn( - `[cron:${params.job.id}] LiveSessionModelSwitchError retry limit reached (${MAX_MODEL_SWITCH_RETRIES}); aborting`, - ); - throw err; - } - liveSelection = { - provider: err.provider, - model: err.model, - authProfileId: err.authProfileId, - authProfileIdSource: err.authProfileId ? err.authProfileIdSource : undefined, - }; - fallbackProvider = err.provider; - fallbackModel = err.model; - syncSessionEntryLiveSelection(); - // Persist the corrected model before retrying so sessions_list - // reflects the real model even if the retry also fails. - try { - await persistSessionEntry(); - } catch (persistErr) { - logWarn( - `[cron:${params.job.id}] Failed to persist model switch session entry: ${String(persistErr)}`, - ); - } - continue; - } - throw err; - } - } - if (!runResult) { - throw new Error("cron isolated run returned no result"); - } - - // Guardrail for cron jobs: if the first turn is only an interim ack - // (e.g. "on it") and no descendants are active, run one focused follow-up - // turn so the cron run returns an actual completion. - if (!isAborted()) { - const interimRunResult = runResult; - const interimPayloads = interimRunResult.payloads ?? []; - const { - deliveryPayloadHasStructuredContent: interimPayloadHasStructuredContent, - outputText: interimOutputText, - } = resolveCronPayloadOutcome({ - payloads: interimPayloads, - runLevelError: interimRunResult.meta?.error, - }); - const interimText = interimOutputText?.trim() ?? ""; - const hasDescendantsSinceRunStart = listDescendantRunsForRequester(agentSessionKey).some( - (entry) => { - const descendantStartedAt = - typeof entry.startedAt === "number" ? entry.startedAt : entry.createdAt; - return typeof descendantStartedAt === "number" && descendantStartedAt >= runStartedAt; - }, - ); - const shouldRetryInterimAck = - !interimRunResult.meta?.error && - !interimRunResult.didSendViaMessagingTool && - !interimPayloadHasStructuredContent && - !interimPayloads.some((payload) => payload?.isError === true) && - countActiveDescendantRuns(agentSessionKey) === 0 && - !hasDescendantsSinceRunStart && - isLikelyInterimCronMessage(interimText); - - if (shouldRetryInterimAck) { - const continuationPrompt = [ - "Your previous response was only an acknowledgement and did not complete this cron task.", - "Complete the original task now.", - "Do not send a status update like 'on it'.", - "Use tools when needed, including sessions_spawn for parallel subtasks, wait for spawned subagents to finish, then return only the final summary.", - ].join(" "); - await runPrompt(continuationPrompt); - } - } - } catch (err) { - return withRunSession({ status: "error", error: String(err) }); - } - - if (isAborted()) { - return withRunSession({ status: "error", error: abortReason() }); - } - if (!runResult) { - return withRunSession({ status: "error", error: "cron isolated run returned no result" }); - } - const finalRunResult = runResult; - const payloads = finalRunResult.payloads ?? []; - - // Update token+model fields in the session store. - // Also collect best-effort telemetry for the cron run log. - let telemetry: CronRunTelemetry | undefined; - { - if (finalRunResult.meta?.systemPromptReport) { - cronSession.sessionEntry.systemPromptReport = finalRunResult.meta.systemPromptReport; - } - const usage = finalRunResult.meta?.agentMeta?.usage; - const promptTokens = finalRunResult.meta?.agentMeta?.promptTokens; - const modelUsed = finalRunResult.meta?.agentMeta?.model ?? fallbackModel ?? liveSelection.model; - const providerUsed = - finalRunResult.meta?.agentMeta?.provider ?? fallbackProvider ?? liveSelection.provider; - const contextTokens = - resolvePositiveContextTokens(agentCfg?.contextTokens) ?? - lookupContextTokens(modelUsed, { allowAsyncLoad: false }) ?? - resolvePositiveContextTokens(cronSession.sessionEntry.contextTokens) ?? - DEFAULT_CONTEXT_TOKENS; - - setSessionRuntimeModel(cronSession.sessionEntry, { - provider: providerUsed, - model: modelUsed, - }); - cronSession.sessionEntry.contextTokens = contextTokens; - if (isCliProvider(providerUsed, cfgWithAgentDefaults)) { - const cliSessionId = finalRunResult.meta?.agentMeta?.sessionId?.trim(); - if (cliSessionId) { - const { setCliSessionId } = await loadCliRunnerRuntime(); - setCliSessionId(cronSession.sessionEntry, providerUsed, cliSessionId); - } - } - if (hasNonzeroUsage(usage)) { - const { estimateUsageCost, resolveModelCostConfig } = await loadUsageFormatRuntime(); - const input = usage.input ?? 0; - const output = usage.output ?? 0; - const totalTokens = deriveSessionTotalTokens({ - usage, - contextTokens, - promptTokens, - }); - const runEstimatedCostUsd = resolveNonNegativeNumber( - estimateUsageCost({ - usage, - cost: resolveModelCostConfig({ - provider: providerUsed, - model: modelUsed, - config: cfgWithAgentDefaults, - }), - }), - ); - cronSession.sessionEntry.inputTokens = input; - cronSession.sessionEntry.outputTokens = output; - const telemetryUsage: NonNullable = { - input_tokens: input, - output_tokens: output, - }; - if (typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0) { - cronSession.sessionEntry.totalTokens = totalTokens; - cronSession.sessionEntry.totalTokensFresh = true; - telemetryUsage.total_tokens = totalTokens; - } else { - cronSession.sessionEntry.totalTokens = undefined; - cronSession.sessionEntry.totalTokensFresh = false; - } - cronSession.sessionEntry.cacheRead = usage.cacheRead ?? 0; - cronSession.sessionEntry.cacheWrite = usage.cacheWrite ?? 0; - if (runEstimatedCostUsd !== undefined) { - cronSession.sessionEntry.estimatedCostUsd = - (resolveNonNegativeNumber(cronSession.sessionEntry.estimatedCostUsd) ?? 0) + - runEstimatedCostUsd; - } - - telemetry = { - model: modelUsed, - provider: providerUsed, - usage: telemetryUsage, - }; - } else { - telemetry = { - model: modelUsed, - provider: providerUsed, - }; - } - await persistSessionEntry(); - } - - if (isAborted()) { - return withRunSession({ status: "error", error: abortReason(), ...telemetry }); - } - let { - summary, - outputText, - synthesizedText, - deliveryPayloads, - deliveryPayloadHasStructuredContent, - hasFatalErrorPayload, - embeddedRunError, - } = resolveCronPayloadOutcome({ - payloads, - runLevelError: finalRunResult.meta?.error, - }); - const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job); - const resolveRunOutcome = (params?: { delivered?: boolean; deliveryAttempted?: boolean }) => - withRunSession({ - status: hasFatalErrorPayload ? "error" : "ok", - ...(hasFatalErrorPayload - ? { error: embeddedRunError ?? "cron isolated run returned an error payload" } - : {}), - summary, - outputText, - delivered: params?.delivered, - deliveryAttempted: params?.deliveryAttempted, - ...telemetry, - }); - - // Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content). - const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg); - const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars); - const skipMessagingToolDelivery = - deliveryContract === "shared" && - deliveryRequested && - finalRunResult.didSendViaMessagingTool === true && - (finalRunResult.messagingToolSentTargets ?? []).some((target) => - matchesMessagingToolDeliveryTarget(target, { - channel: resolvedDelivery.channel, - to: resolvedDelivery.to, - accountId: resolvedDelivery.accountId, - }), - ); - const deliveryResult = await dispatchCronDelivery({ - cfg: params.cfg, - cfgWithAgentDefaults, - deps: params.deps, - job: params.job, - agentId, - agentSessionKey, - runSessionId, - runStartedAt, - runEndedAt, - timeoutMs, - resolvedDelivery, - deliveryRequested, - skipHeartbeatDelivery, - skipMessagingToolDelivery, - deliveryBestEffort, - deliveryPayloadHasStructuredContent, - deliveryPayloads, - synthesizedText, - summary, - outputText, - telemetry, - abortSignal, - isAborted, - abortReason, - withRunSession, - }); - if (deliveryResult.result) { - const resultWithDeliveryMeta: RunCronAgentTurnResult = { - ...deliveryResult.result, - deliveryAttempted: - deliveryResult.result.deliveryAttempted ?? deliveryResult.deliveryAttempted, - }; - if (!hasFatalErrorPayload || deliveryResult.result.status !== "ok") { - return resultWithDeliveryMeta; - } - return resolveRunOutcome({ - delivered: deliveryResult.result.delivered, - deliveryAttempted: resultWithDeliveryMeta.deliveryAttempted, - }); - } - const delivered = deliveryResult.delivered; - const deliveryAttempted = deliveryResult.deliveryAttempted; - summary = deliveryResult.summary; - outputText = deliveryResult.outputText; - - return resolveRunOutcome({ delivered, deliveryAttempted }); }