mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:50:43 +00:00
fix(cron): persist manual run ids in history
This commit is contained in:
@@ -29,6 +29,7 @@ export type CronRunLogEntry = {
|
||||
delivery?: CronDeliveryTrace;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runId?: string;
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
nextRunAtMs?: number;
|
||||
@@ -310,6 +311,7 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL
|
||||
status: obj.status,
|
||||
error: obj.error,
|
||||
summary: obj.summary,
|
||||
runId: typeof obj.runId === "string" && obj.runId.trim() ? obj.runId : undefined,
|
||||
runAtMs: obj.runAtMs,
|
||||
durationMs: obj.durationMs,
|
||||
nextRunAtMs: obj.nextRunAtMs,
|
||||
|
||||
@@ -439,6 +439,7 @@ type PreparedManualRun =
|
||||
ok: true;
|
||||
ran: true;
|
||||
jobId: string;
|
||||
runId?: string;
|
||||
taskRunId?: string;
|
||||
startedAt: number;
|
||||
executionJob: CronJob;
|
||||
@@ -507,8 +508,9 @@ function tryCreateManualTaskRun(params: {
|
||||
state: CronServiceState;
|
||||
job: CronJob;
|
||||
startedAt: number;
|
||||
runId?: string;
|
||||
}): string | undefined {
|
||||
const runId = createCronExecutionId(params.job.id, params.startedAt);
|
||||
const runId = params.runId ?? createCronExecutionId(params.job.id, params.startedAt);
|
||||
try {
|
||||
createRunningTaskRun({
|
||||
runtime: "cron",
|
||||
@@ -630,6 +632,7 @@ async function prepareManualRun(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
opts?: { runId?: string },
|
||||
): Promise<PreparedManualRun> {
|
||||
const preflight = await inspectManualRunPreflight(state, id, mode);
|
||||
if (!preflight.ok) {
|
||||
@@ -659,12 +662,14 @@ async function prepareManualRun(
|
||||
state,
|
||||
job,
|
||||
startedAt: preflight.now,
|
||||
runId: opts?.runId,
|
||||
});
|
||||
const executionJob = structuredClone(job);
|
||||
return {
|
||||
ok: true,
|
||||
ran: true,
|
||||
jobId: job.id,
|
||||
runId: opts?.runId ?? taskRunId,
|
||||
taskRunId,
|
||||
startedAt: preflight.now,
|
||||
executionJob,
|
||||
@@ -681,6 +686,7 @@ async function finishPreparedManualRun(
|
||||
const startedAt = prepared.startedAt;
|
||||
const jobId = prepared.jobId;
|
||||
const taskRunId = prepared.taskRunId;
|
||||
const runId = prepared.runId;
|
||||
|
||||
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
|
||||
try {
|
||||
@@ -728,6 +734,7 @@ async function finishPreparedManualRun(
|
||||
delivery: coreResult.delivery,
|
||||
sessionId: coreResult.sessionId,
|
||||
sessionKey: coreResult.sessionKey,
|
||||
runId,
|
||||
runAtMs: startedAt,
|
||||
durationMs: job.state.lastDurationMs,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
@@ -767,8 +774,13 @@ async function finishPreparedManualRun(
|
||||
});
|
||||
}
|
||||
|
||||
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
|
||||
const prepared = await prepareManualRun(state, id, mode);
|
||||
export async function run(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
opts?: { runId?: string },
|
||||
) {
|
||||
const prepared = await prepareManualRun(state, id, mode, opts);
|
||||
if (!prepared.ok || !prepared.ran) {
|
||||
return prepared;
|
||||
}
|
||||
@@ -786,7 +798,7 @@ export async function enqueueRun(state: CronServiceState, id: string, mode?: "du
|
||||
void enqueueCommandInLane(
|
||||
CommandLane.Cron,
|
||||
async () => {
|
||||
const result = await run(state, id, mode);
|
||||
const result = await run(state, id, mode, { runId });
|
||||
if (result.ok && "ran" in result && !result.ran) {
|
||||
state.deps.log.info(
|
||||
{ jobId: id, runId, reason: result.reason },
|
||||
|
||||
@@ -30,6 +30,7 @@ export type CronEvent = {
|
||||
delivery?: CronDeliveryTrace;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runId?: string;
|
||||
nextRunAtMs?: number;
|
||||
} & CronRunTelemetry;
|
||||
|
||||
|
||||
@@ -365,6 +365,7 @@ export const CronRunLogEntrySchema = Type.Object(
|
||||
deliveryError: Type.Optional(Type.String()),
|
||||
sessionId: Type.Optional(NonEmptyString),
|
||||
sessionKey: Type.Optional(NonEmptyString),
|
||||
runId: Type.Optional(NonEmptyString),
|
||||
runAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
durationMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
|
||||
@@ -371,6 +371,7 @@ export function buildGatewayCronService(params: {
|
||||
"deliveryError",
|
||||
"sessionId",
|
||||
"sessionKey",
|
||||
"runId",
|
||||
"nextRunAtMs",
|
||||
"model",
|
||||
"provider",
|
||||
@@ -410,6 +411,7 @@ export function buildGatewayCronService(params: {
|
||||
delivery: evt.delivery,
|
||||
sessionId: evt.sessionId,
|
||||
sessionKey: evt.sessionKey,
|
||||
runId: evt.runId,
|
||||
runAtMs: evt.runAtMs,
|
||||
durationMs: evt.durationMs,
|
||||
nextRunAtMs: evt.nextRunAtMs,
|
||||
|
||||
@@ -861,6 +861,8 @@ describe("gateway server cron", () => {
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
|
||||
expect(runRes.ok).toBe(true);
|
||||
expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
|
||||
const manualRunId = (runRes.payload as { runId?: unknown } | null)?.runId;
|
||||
expect(typeof manualRunId).toBe("string");
|
||||
const finishedPayload = await finishedRun;
|
||||
expect(finishedPayload).toMatchObject({
|
||||
jobId,
|
||||
@@ -879,6 +881,7 @@ describe("gateway server cron", () => {
|
||||
expect((entries as Array<{ deliveryStatus?: unknown }>).at(-1)?.deliveryStatus).toBe(
|
||||
"not-requested",
|
||||
);
|
||||
expect((entries as Array<{ runId?: unknown }>).at(-1)?.runId).toBe(manualRunId);
|
||||
const allRunsRes = await rpcReq(ws, "cron.runs", {
|
||||
scope: "all",
|
||||
limit: 50,
|
||||
|
||||
Reference in New Issue
Block a user