diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 0bca59f4aca..d98248f81b9 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -29,6 +29,7 @@ export type CronRunLogEntry = { delivery?: CronDeliveryTrace; sessionId?: string; sessionKey?: string; + runId?: string; runAtMs?: number; durationMs?: number; nextRunAtMs?: number; @@ -310,6 +311,7 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL status: obj.status, error: obj.error, summary: obj.summary, + runId: typeof obj.runId === "string" && obj.runId.trim() ? obj.runId : undefined, runAtMs: obj.runAtMs, durationMs: obj.durationMs, nextRunAtMs: obj.nextRunAtMs, diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index f8c352a44de..2dabd216a91 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -439,6 +439,7 @@ type PreparedManualRun = ok: true; ran: true; jobId: string; + runId?: string; taskRunId?: string; startedAt: number; executionJob: CronJob; @@ -507,8 +508,9 @@ function tryCreateManualTaskRun(params: { state: CronServiceState; job: CronJob; startedAt: number; + runId?: string; }): string | undefined { - const runId = createCronExecutionId(params.job.id, params.startedAt); + const runId = params.runId ?? createCronExecutionId(params.job.id, params.startedAt); try { createRunningTaskRun({ runtime: "cron", @@ -630,6 +632,7 @@ async function prepareManualRun( state: CronServiceState, id: string, mode?: "due" | "force", + opts?: { runId?: string }, ): Promise { const preflight = await inspectManualRunPreflight(state, id, mode); if (!preflight.ok) { @@ -659,12 +662,14 @@ async function prepareManualRun( state, job, startedAt: preflight.now, + runId: opts?.runId, }); const executionJob = structuredClone(job); return { ok: true, ran: true, jobId: job.id, + runId: opts?.runId ?? taskRunId, taskRunId, startedAt: preflight.now, executionJob, @@ -681,6 +686,7 @@ async function finishPreparedManualRun( const startedAt = prepared.startedAt; const jobId = prepared.jobId; const taskRunId = prepared.taskRunId; + const runId = prepared.runId; let coreResult: Awaited>; try { @@ -728,6 +734,7 @@ async function finishPreparedManualRun( delivery: coreResult.delivery, sessionId: coreResult.sessionId, sessionKey: coreResult.sessionKey, + runId, runAtMs: startedAt, durationMs: job.state.lastDurationMs, nextRunAtMs: job.state.nextRunAtMs, @@ -767,8 +774,13 @@ async function finishPreparedManualRun( }); } -export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { - const prepared = await prepareManualRun(state, id, mode); +export async function run( + state: CronServiceState, + id: string, + mode?: "due" | "force", + opts?: { runId?: string }, +) { + const prepared = await prepareManualRun(state, id, mode, opts); if (!prepared.ok || !prepared.ran) { return prepared; } @@ -786,7 +798,7 @@ export async function enqueueRun(state: CronServiceState, id: string, mode?: "du void enqueueCommandInLane( CommandLane.Cron, async () => { - const result = await run(state, id, mode); + const result = await run(state, id, mode, { runId }); if (result.ok && "ran" in result && !result.ran) { state.deps.log.info( { jobId: id, runId, reason: result.reason }, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index e81bea18aec..de2f0ec1581 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -30,6 +30,7 @@ export type CronEvent = { delivery?: CronDeliveryTrace; sessionId?: string; sessionKey?: string; + runId?: string; nextRunAtMs?: number; } & CronRunTelemetry; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index b76c7af96fb..f1b268763d2 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -365,6 +365,7 @@ export const CronRunLogEntrySchema = Type.Object( deliveryError: Type.Optional(Type.String()), sessionId: Type.Optional(NonEmptyString), sessionKey: Type.Optional(NonEmptyString), + runId: Type.Optional(NonEmptyString), runAtMs: Type.Optional(Type.Integer({ minimum: 0 })), durationMs: Type.Optional(Type.Integer({ minimum: 0 })), nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 3d13d406970..9e4f4eaae2a 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -371,6 +371,7 @@ export function buildGatewayCronService(params: { "deliveryError", "sessionId", "sessionKey", + "runId", "nextRunAtMs", "model", "provider", @@ -410,6 +411,7 @@ export function buildGatewayCronService(params: { delivery: evt.delivery, sessionId: evt.sessionId, sessionKey: evt.sessionKey, + runId: evt.runId, runAtMs: evt.runAtMs, durationMs: evt.durationMs, nextRunAtMs: evt.nextRunAtMs, diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 584babcec16..e8cd6fb5972 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -861,6 +861,8 @@ describe("gateway server cron", () => { const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000); expect(runRes.ok).toBe(true); expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + const manualRunId = (runRes.payload as { runId?: unknown } | null)?.runId; + expect(typeof manualRunId).toBe("string"); const finishedPayload = await finishedRun; expect(finishedPayload).toMatchObject({ jobId, @@ -879,6 +881,7 @@ describe("gateway server cron", () => { expect((entries as Array<{ deliveryStatus?: unknown }>).at(-1)?.deliveryStatus).toBe( "not-requested", ); + expect((entries as Array<{ runId?: unknown }>).at(-1)?.runId).toBe(manualRunId); const allRunsRes = await rpcReq(ws, "cron.runs", { scope: "all", limit: 50,