refactor: split isolated cron runner phases

This commit is contained in:
Peter Steinberger
2026-04-04 14:42:27 +01:00
parent 9698ba7215
commit bbb73d3171
2 changed files with 876 additions and 699 deletions

View File

@@ -0,0 +1,348 @@
import type { SkillSnapshot } from "../../agents/skills.js";
import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js";
import type { CronJob } from "../types.js";
import { resolveCronPayloadOutcome } from "./helpers.js";
import {
countActiveDescendantRuns,
listDescendantRunsForRequester,
LiveSessionModelSwitchError,
getCliSessionId,
isCliProvider,
logWarn,
normalizeVerboseLevel,
registerAgentRunContext,
resolveBootstrapWarningSignaturesSeen,
resolveFastModeState,
resolveNestedAgentLane,
resolveSessionTranscriptPath,
runCliAgent,
runEmbeddedPiAgent,
runWithModelFallback,
} from "./run-execution.runtime.js";
import { resolveCronFallbacksOverride } from "./run-fallback-policy.js";
import type {
CronLiveSelection,
MutableCronSession,
PersistCronSessionEntry,
} from "./run-session-state.js";
import { syncCronSessionLiveSelection } from "./run-session-state.js";
import { isLikelyInterimCronMessage } from "./subagent-followup-hints.js";
type AgentTurnPayload = Extract<CronJob["payload"], { kind: "agentTurn" }> | null;
type CronPromptRunResult = Awaited<ReturnType<typeof runCliAgent>>;
export type CronExecutionResult = {
runResult: CronPromptRunResult;
fallbackProvider: string;
fallbackModel: string;
runStartedAt: number;
runEndedAt: number;
liveSelection: CronLiveSelection;
};
export function createCronPromptExecutor(params: {
cfg: OpenClawConfig;
cfgWithAgentDefaults: OpenClawConfig;
job: CronJob;
agentId: string;
agentDir: string;
agentSessionKey: string;
workspaceDir: string;
lane?: string;
resolvedVerboseLevel: VerboseLevel;
thinkLevel: ThinkLevel | undefined;
timeoutMs: number;
messageChannel: string | undefined;
resolvedDelivery: { accountId?: string };
toolPolicy: {
requireExplicitMessageTarget: boolean;
disableMessageTool: boolean;
};
skillsSnapshot: SkillSnapshot;
agentPayload: AgentTurnPayload;
liveSelection: CronLiveSelection;
cronSession: MutableCronSession;
abortSignal?: AbortSignal;
abortReason: () => string;
}) {
const sessionFile = resolveSessionTranscriptPath(
params.cronSession.sessionEntry.sessionId,
params.agentId,
);
const cronFallbacksOverride = resolveCronFallbacksOverride({
cfg: params.cfg,
job: params.job,
agentId: params.agentId,
});
let runResult: CronPromptRunResult | undefined;
let fallbackProvider = params.liveSelection.provider;
let fallbackModel = params.liveSelection.model;
let runEndedAt = Date.now();
let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
params.cronSession.sessionEntry.systemPromptReport,
);
const runPrompt = async (promptText: string) => {
const fallbackResult = await runWithModelFallback({
cfg: params.cfgWithAgentDefaults,
provider: params.liveSelection.provider,
model: params.liveSelection.model,
runId: params.cronSession.sessionEntry.sessionId,
agentDir: params.agentDir,
fallbacksOverride: cronFallbacksOverride,
run: async (providerOverride, modelOverride, runOptions) => {
if (params.abortSignal?.aborted) {
throw new Error(params.abortReason());
}
const bootstrapPromptWarningSignature =
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1];
if (isCliProvider(providerOverride, params.cfgWithAgentDefaults)) {
const cliSessionId = params.cronSession.isNewSession
? undefined
: getCliSessionId(params.cronSession.sessionEntry, providerOverride);
const result = await runCliAgent({
sessionId: params.cronSession.sessionEntry.sessionId,
sessionKey: params.agentSessionKey,
agentId: params.agentId,
sessionFile,
workspaceDir: params.workspaceDir,
config: params.cfgWithAgentDefaults,
prompt: promptText,
provider: providerOverride,
model: modelOverride,
thinkLevel: params.thinkLevel,
timeoutMs: params.timeoutMs,
runId: params.cronSession.sessionEntry.sessionId,
cliSessionId,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
return result;
}
const result = await runEmbeddedPiAgent({
sessionId: params.cronSession.sessionEntry.sessionId,
sessionKey: params.agentSessionKey,
agentId: params.agentId,
trigger: "cron",
allowGatewaySubagentBinding: true,
senderIsOwner: true,
messageChannel: params.messageChannel,
agentAccountId: params.resolvedDelivery.accountId,
sessionFile,
agentDir: params.agentDir,
workspaceDir: params.workspaceDir,
config: params.cfgWithAgentDefaults,
skillsSnapshot: params.skillsSnapshot,
prompt: promptText,
lane: resolveNestedAgentLane(params.lane),
provider: providerOverride,
model: modelOverride,
authProfileId: params.liveSelection.authProfileId,
authProfileIdSource: params.liveSelection.authProfileId
? params.liveSelection.authProfileIdSource
: undefined,
thinkLevel: params.thinkLevel,
fastMode: resolveFastModeState({
cfg: params.cfgWithAgentDefaults,
provider: providerOverride,
model: modelOverride,
agentId: params.agentId,
sessionEntry: params.cronSession.sessionEntry,
}).enabled,
verboseLevel: params.resolvedVerboseLevel,
timeoutMs: params.timeoutMs,
bootstrapContextMode: params.agentPayload?.lightContext ? "lightweight" : undefined,
bootstrapContextRunKind: "cron",
toolsAllow: params.agentPayload?.toolsAllow,
runId: params.cronSession.sessionEntry.sessionId,
requireExplicitMessageTarget: params.toolPolicy.requireExplicitMessageTarget,
disableMessageTool: params.toolPolicy.disableMessageTool,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
abortSignal: params.abortSignal,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
return result;
},
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
params.liveSelection.provider = fallbackResult.provider;
params.liveSelection.model = fallbackResult.model;
runEndedAt = Date.now();
};
return {
runPrompt,
getState: () => ({
runResult,
fallbackProvider,
fallbackModel,
runEndedAt,
liveSelection: params.liveSelection,
}),
};
}
export async function executeCronRun(params: {
cfg: OpenClawConfig;
cfgWithAgentDefaults: OpenClawConfig;
job: CronJob;
agentId: string;
agentDir: string;
agentSessionKey: string;
workspaceDir: string;
lane?: string;
resolvedDelivery: {
channel?: string;
accountId?: string;
};
toolPolicy: {
requireExplicitMessageTarget: boolean;
disableMessageTool: boolean;
};
skillsSnapshot: SkillSnapshot;
agentPayload: AgentTurnPayload;
agentVerboseDefault: AgentDefaultsConfig["verboseDefault"];
liveSelection: CronLiveSelection;
cronSession: MutableCronSession;
commandBody: string;
persistSessionEntry: PersistCronSessionEntry;
abortSignal?: AbortSignal;
abortReason: () => string;
isAborted: () => boolean;
thinkLevel: ThinkLevel | undefined;
timeoutMs: number;
runStartedAt?: number;
}): Promise<CronExecutionResult> {
const resolvedVerboseLevel: VerboseLevel =
normalizeVerboseLevel(params.cronSession.sessionEntry.verboseLevel) ??
normalizeVerboseLevel(params.agentVerboseDefault) ??
"off";
registerAgentRunContext(params.cronSession.sessionEntry.sessionId, {
sessionKey: params.agentSessionKey,
verboseLevel: resolvedVerboseLevel,
});
const executor = createCronPromptExecutor({
cfg: params.cfg,
cfgWithAgentDefaults: params.cfgWithAgentDefaults,
job: params.job,
agentId: params.agentId,
agentDir: params.agentDir,
agentSessionKey: params.agentSessionKey,
workspaceDir: params.workspaceDir,
lane: params.lane,
resolvedVerboseLevel,
thinkLevel: params.thinkLevel,
timeoutMs: params.timeoutMs,
messageChannel: params.resolvedDelivery.channel,
resolvedDelivery: params.resolvedDelivery,
toolPolicy: params.toolPolicy,
skillsSnapshot: params.skillsSnapshot,
agentPayload: params.agentPayload,
liveSelection: params.liveSelection,
cronSession: params.cronSession,
abortSignal: params.abortSignal,
abortReason: params.abortReason,
});
const runStartedAt = params.runStartedAt ?? Date.now();
const MAX_MODEL_SWITCH_RETRIES = 2;
let modelSwitchRetries = 0;
while (true) {
try {
await executor.runPrompt(params.commandBody);
break;
} catch (err) {
if (!(err instanceof LiveSessionModelSwitchError)) {
throw err;
}
modelSwitchRetries += 1;
if (modelSwitchRetries > MAX_MODEL_SWITCH_RETRIES) {
logWarn(
`[cron:${params.job.id}] LiveSessionModelSwitchError retry limit reached (${MAX_MODEL_SWITCH_RETRIES}); aborting`,
);
throw err;
}
params.liveSelection.provider = err.provider;
params.liveSelection.model = err.model;
params.liveSelection.authProfileId = err.authProfileId;
params.liveSelection.authProfileIdSource = err.authProfileId
? err.authProfileIdSource
: undefined;
syncCronSessionLiveSelection({
entry: params.cronSession.sessionEntry,
liveSelection: params.liveSelection,
});
try {
await params.persistSessionEntry();
} catch (persistErr) {
logWarn(
`[cron:${params.job.id}] Failed to persist model switch session entry: ${String(persistErr)}`,
);
}
continue;
}
}
let { runResult, fallbackProvider, fallbackModel, runEndedAt } = executor.getState();
if (!runResult) {
throw new Error("cron isolated run returned no result");
}
if (!params.isAborted()) {
const interimPayloads = runResult.payloads ?? [];
const {
deliveryPayloadHasStructuredContent: interimPayloadHasStructuredContent,
outputText: interimOutputText,
} = resolveCronPayloadOutcome({
payloads: interimPayloads,
runLevelError: runResult.meta?.error,
});
const interimText = interimOutputText?.trim() ?? "";
const shouldRetryInterimAck =
!runResult.meta?.error &&
!runResult.didSendViaMessagingTool &&
!interimPayloadHasStructuredContent &&
!interimPayloads.some((payload) => payload?.isError === true) &&
!listDescendantRunsForRequester(params.agentSessionKey).some((entry) => {
const descendantStartedAt =
typeof entry.startedAt === "number" ? entry.startedAt : entry.createdAt;
return typeof descendantStartedAt === "number" && descendantStartedAt >= runStartedAt;
}) &&
countActiveDescendantRuns(params.agentSessionKey) === 0 &&
isLikelyInterimCronMessage(interimText);
if (shouldRetryInterimAck) {
const continuationPrompt = [
"Your previous response was only an acknowledgement and did not complete this cron task.",
"Complete the original task now.",
"Do not send a status update like 'on it'.",
"Use tools when needed, including sessions_spawn for parallel subtasks, wait for spawned subagents to finish, then return only the final summary.",
].join(" ");
await executor.runPrompt(continuationPrompt);
({ runResult, fallbackProvider, fallbackModel, runEndedAt } = executor.getState());
}
}
if (!runResult) {
throw new Error("cron isolated run returned no result");
}
return {
runResult,
fallbackProvider,
fallbackModel,
runStartedAt,
runEndedAt,
liveSelection: params.liveSelection,
};
}

File diff suppressed because it is too large Load Diff