Files
openclaw/src/cron/service/state.ts
Devin Robison f61896b03c fix(cron): preserve untrusted awareness event labels (#68210)
* fix(cron): preserve untrusted awareness event labels

Keep isolated cron awareness summaries untrusted when they are promoted into the main session, and forward explicit trust downgrades through the gateway cron wrapper. Add focused regression coverage for both paths.

* changelog: note cron awareness untrusted-label preservation (#68210)
2026-04-17 12:43:48 -06:00

170 lines
5.1 KiB
TypeScript

import type { CronConfig } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import type {
CronDeliveryStatus,
CronJob,
CronJobCreate,
CronJobPatch,
CronMessageChannel,
CronRunOutcome,
CronRunStatus,
CronRunTelemetry,
CronStoreFile,
} from "../types.js";
export type CronEvent = {
jobId: string;
action: "added" | "updated" | "removed" | "started" | "finished";
runAtMs?: number;
durationMs?: number;
status?: CronRunStatus;
error?: string;
summary?: string;
delivered?: boolean;
deliveryStatus?: CronDeliveryStatus;
deliveryError?: string;
sessionId?: string;
sessionKey?: string;
nextRunAtMs?: number;
} & CronRunTelemetry;
export type Logger = {
debug: (obj: unknown, msg?: string) => void;
info: (obj: unknown, msg?: string) => void;
warn: (obj: unknown, msg?: string) => void;
error: (obj: unknown, msg?: string) => void;
};
export type CronServiceDeps = {
nowMs?: () => number;
log: Logger;
storePath: string;
cronEnabled: boolean;
/** CronConfig for session retention settings. */
cronConfig?: CronConfig;
/** Default agent id for jobs without an agent id. */
defaultAgentId?: string;
/** Resolve session store path for a given agent id. */
resolveSessionStorePath?: (agentId?: string) => string;
/** Path to the session store (sessions.json) for reaper use. */
sessionStorePath?: string;
/**
* Delay in ms between missed job executions on startup.
* Prevents overwhelming the gateway when many jobs are overdue.
* See: https://github.com/openclaw/openclaw/issues/18892
*/
missedJobStaggerMs?: number;
/**
* Maximum number of missed jobs to run immediately on startup.
* Additional missed jobs will be rescheduled to fire gradually.
* See: https://github.com/openclaw/openclaw/issues/18892
*/
maxMissedJobsPerRestart?: number;
enqueueSystemEvent: (
text: string,
opts?: { agentId?: string; sessionKey?: string; contextKey?: string; trusted?: boolean },
) => void;
requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void;
runHeartbeatOnce?: (opts?: {
reason?: string;
agentId?: string;
sessionKey?: string;
/** Optional heartbeat config override (e.g. target: "last" for cron-triggered heartbeats). */
heartbeat?: { target?: string };
}) => Promise<HeartbeatRunResult>;
/**
* WakeMode=now: max time to wait for runHeartbeatOnce to stop returning
* { status:"skipped", reason:"requests-in-flight" } before falling back to
* requestHeartbeatNow.
*/
wakeNowHeartbeatBusyMaxWaitMs?: number;
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
wakeNowHeartbeatBusyRetryDelayMs?: number;
runIsolatedAgentJob: (params: {
job: CronJob;
message: string;
abortSignal?: AbortSignal;
}) => Promise<
{
summary?: string;
/** Last non-empty agent text output (not truncated). */
outputText?: string;
/**
* `true` when the isolated run already delivered its output to the target
* channel (including matching messaging-tool sends). See:
* https://github.com/openclaw/openclaw/issues/15692
*/
delivered?: boolean;
/**
* `true` when announce/direct delivery was attempted for this run, even
* if the final per-message ack status is uncertain.
*/
deliveryAttempted?: boolean;
} & CronRunOutcome &
CronRunTelemetry
>;
sendCronFailureAlert?: (params: {
job: CronJob;
text: string;
channel: CronMessageChannel;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
}) => Promise<void>;
onEvent?: (evt: CronEvent) => void;
};
export type CronServiceDepsInternal = Omit<CronServiceDeps, "nowMs"> & {
nowMs: () => number;
};
export type CronServiceState = {
deps: CronServiceDepsInternal;
store: CronStoreFile | null;
timer: NodeJS.Timeout | null;
running: boolean;
op: Promise<unknown>;
warnedDisabled: boolean;
storeLoadedAtMs: number | null;
storeFileMtimeMs: number | null;
};
export function createCronServiceState(deps: CronServiceDeps): CronServiceState {
return {
deps: { ...deps, nowMs: deps.nowMs ?? (() => Date.now()) },
store: null,
timer: null,
running: false,
op: Promise.resolve(),
warnedDisabled: false,
storeLoadedAtMs: null,
storeFileMtimeMs: null,
};
}
export type CronRunMode = "due" | "force";
export type CronWakeMode = "now" | "next-heartbeat";
export type CronStatusSummary = {
enabled: boolean;
storePath: string;
jobs: number;
nextWakeAtMs: number | null;
};
export type CronRunResult =
| { ok: true; ran: true }
| { ok: true; enqueued: true; runId: string }
| { ok: true; ran: false; reason: "not-due" }
| { ok: true; ran: false; reason: "already-running" }
| { ok: false };
export type CronRemoveResult = { ok: true; removed: boolean } | { ok: false; removed: false };
export type CronAddResult = CronJob;
export type CronUpdateResult = CronJob;
export type CronListResult = CronJob[];
export type CronAddInput = CronJobCreate;
export type CronUpdateInput = CronJobPatch;