From cb13be375d334fd9f38e87c836b189fa504eebfc Mon Sep 17 00:00:00 2001 From: ly-wang19 <94427531+ly-wang19@users.noreply.github.com> Date: Wed, 24 Jun 2026 22:10:49 +0800 Subject: [PATCH] fix(tasks): preserve both cron-run session key shapes during maintenance (#96352) * fix(tasks): preserve both cron-run session key shapes during maintenance Session-registry maintenance keeps running cron jobs' session rows, but readRunningCronJobIds built the preserve-set with job.id.toLowerCase() only. Cron-run session keys carry two job-segment shapes: main-session runs use the slugified segment (normalizeCronLaneSegment, e.g. "daily-report") while default-isolated runs use the raw lowercased id ("daily report", built from cron:${job.id} via toAgentStoreSessionKey, which lowercases but does not slugify). The lowercase-only matcher preserved isolated runs but pruned main-session runs of any non-slug job id (e.g. "Daily Report") as stale. Preserve both shapes (raw lowercased id and slugified segment). This is strictly more-preserving, so no live running cron session is dropped. Adds a regression test seeding both a slug main-session run and a raw isolated run for a non-slug job id, asserting both survive while a non-running job's run is still pruned. Co-Authored-By: Claude Opus 4.8 (1M context) * fix(tasks): match cron session keys to target shape * fix(tasks): preserve active cron aliases across retargeting * fix(tasks): retain explicit cron session aliases --------- Co-authored-by: ly-wang19 Co-authored-by: Claude Opus 4.8 (1M context) Co-authored-by: Vincent Koc --- src/commands/tasks.test.ts | 119 +++++++++++++++++++++++++++++++++++++ src/commands/tasks.ts | 40 +++++++++---- 2 files changed, 149 insertions(+), 10 deletions(-) diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts index 9eaba5f87aa..20e5f1f0e83 100644 --- a/src/commands/tasks.test.ts +++ b/src/commands/tasks.test.ts @@ -370,6 +370,125 @@ describe("tasks commands", () => { }); }); + it("preserves both cron-run session key shapes for a running non-slug job id", async () => { + await withTaskCommandStateDir(async (state) => { + const now = Date.now(); + const old = now - 8 * 24 * 60 * 60_000; + await saveCronStore(state.statePath("cron", "jobs.json"), { + version: 1, + jobs: [ + { + id: "Daily Report", + name: "Daily Report", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + sessionKey: "cron:daily-report", + wakeMode: "now", + payload: { kind: "agentTurn", message: "ping" }, + delivery: { mode: "none" }, + createdAtMs: now, + updatedAtMs: now, + state: { runningAtMs: now - 5_000 }, + }, + ], + }); + + const sessionsDir = state.sessionsDir("main"); + const storePath = path.join(sessionsDir, "sessions.json"); + await fs.mkdir(sessionsDir, { recursive: true }); + // A running job can be retargeted after its session is created, so maintenance must preserve + // both the raw and slugged historical shapes. + const slugKey = "agent:main:cron:daily-report:run:old-run"; + const rawKey = "agent:main:cron:daily report:run:old-run"; + const retiredKey = "agent:main:cron:retired-job:run:old-run"; + await fs.writeFile( + storePath, + JSON.stringify( + { + [slugKey]: { sessionId: "slug-run", updatedAt: old }, + [rawKey]: { sessionId: "raw-run", updatedAt: old }, + [retiredKey]: { sessionId: "retired-run", updatedAt: old }, + }, + null, + 2, + ), + "utf8", + ); + + const runtime = createRuntime(); + await tasksMaintenanceCommand({ json: true, apply: true }, runtime); + + const payload = readFirstJsonLog(runtime) as { + maintenance: { sessions: { runningCronJobs: number } }; + }; + expect(payload.maintenance.sessions.runningCronJobs).toBe(1); + const updated = JSON.parse(await fs.readFile(storePath, "utf8")) as Record; + expect(updated[slugKey]).toBeDefined(); + expect(updated[rawKey]).toBeDefined(); + expect(updated[retiredKey]).toBeUndefined(); + }); + }); + + it("preserves a running cron session with an explicit session key", async () => { + await withTaskCommandStateDir(async (state) => { + const now = Date.now(); + const old = now - 8 * 24 * 60 * 60_000; + await saveCronStore(state.statePath("cron", "jobs.json"), { + version: 1, + jobs: [ + { + id: "job-uuid", + name: "Daily monitor", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + sessionKey: "cron:daily-monitor", + wakeMode: "now", + payload: { kind: "agentTurn", message: "ping" }, + delivery: { mode: "none" }, + createdAtMs: now, + updatedAtMs: now, + state: { runningAtMs: now - 5_000 }, + }, + ], + }); + + const sessionsDir = state.sessionsDir("main"); + const storePath = path.join(sessionsDir, "sessions.json"); + await fs.mkdir(sessionsDir, { recursive: true }); + await fs.writeFile( + storePath, + JSON.stringify( + { + "agent:main:cron:daily-monitor:run:old-run": { + sessionId: "explicit-run", + updatedAt: old, + }, + "agent:main:cron:job-uuid:run:old-run": { + sessionId: "job-id-run", + updatedAt: old, + }, + "agent:main:cron:retired-job:run:old-run": { + sessionId: "retired-run", + updatedAt: old, + }, + }, + null, + 2, + ), + "utf8", + ); + + const runtime = createRuntime(); + await tasksMaintenanceCommand({ json: true, apply: true }, runtime); + + const updated = JSON.parse(await fs.readFile(storePath, "utf8")) as Record; + expect(updated["agent:main:cron:daily-monitor:run:old-run"]).toBeDefined(); + expect(updated["agent:main:cron:retired-job:run:old-run"]).toBeUndefined(); + }); + }); + it("does not build JSON-only diagnostics for text maintenance output", async () => { await withTaskCommandStateDir(async () => { const diagnosticsSpy = vi.spyOn( diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 578a6a9f0d7..a00a0775265 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -11,6 +11,7 @@ import { resolveAllAgentSessionStoreTargetsSync, runSessionRegistryMaintenanceForStore, } from "../config/sessions.js"; +import { normalizeCronLaneSegment } from "../cron/service/task-runs.js"; import { loadCronJobsStoreSync, resolveCronJobsStorePath } from "../cron/store.js"; import type { RuntimeEnv } from "../runtime.js"; import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js"; @@ -128,17 +129,36 @@ type SessionRegistryMaintenanceSummary = { stores: SessionRegistryMaintenanceStoreSummary[]; }; -function readRunningCronJobIds(): Set { +function resolveExplicitCronSessionSegment(sessionKey: string | undefined): string | undefined { + const match = /^(?:agent:[^:]+:)?cron:([^:]+)$/u.exec(sessionKey?.trim() ?? ""); + return match?.[1]?.toLowerCase(); +} + +function readRunningCronJobIds(): { ids: Set; count: number } { try { const cronStorePath = resolveCronJobsStorePath(getRuntimeConfig().cron?.store); - return new Set( - loadCronJobsStoreSync(cronStorePath) - .jobs.filter((job) => typeof job.state?.runningAtMs === "number") - // Cron session keys are matched case-insensitively against job ids. - .map((job) => job.id.toLowerCase()), + const runningJobs = loadCronJobsStoreSync(cronStorePath).jobs.filter( + (job) => typeof job.state?.runningAtMs === "number", ); + return { + // A running job may have been retargeted after its session was created. Keep both historical + // shapes; the registry has no producer metadata, so retaining an ambiguous alias is safer + // than pruning a live transcript. + ids: new Set( + runningJobs.flatMap((job) => [ + job.id.toLowerCase(), + normalizeCronLaneSegment(job.id, "job"), + ...(job.sessionTarget !== "main" && job.sessionKey + ? [resolveExplicitCronSessionSegment(job.sessionKey)].filter( + (segment): segment is string => segment !== undefined, + ) + : []), + ]), + ), + count: runningJobs.length, + }; } catch { - return new Set(); + return { ids: new Set(), count: 0 }; } } @@ -146,13 +166,13 @@ async function runSessionRegistryMaintenance(params: { apply: boolean; }): Promise { const cfg = getRuntimeConfig(); - const runningCronJobIds = readRunningCronJobIds(); + const runningCronJobs = readRunningCronJobIds(); const stores: SessionRegistryMaintenanceStoreSummary[] = []; for (const target of resolveAllAgentSessionStoreTargetsSync(cfg)) { const result = await runSessionRegistryMaintenanceForStore({ apply: params.apply, retentionMs: SESSION_REGISTRY_RETENTION_MS, - runningCronJobIds, + runningCronJobIds: runningCronJobs.ids, storePath: target.storePath, }); stores.push({ @@ -166,7 +186,7 @@ async function runSessionRegistryMaintenance(params: { } return { retentionMs: SESSION_REGISTRY_RETENTION_MS, - runningCronJobs: runningCronJobIds.size, + runningCronJobs: runningCronJobs.count, pruned: stores.reduce((total, store) => total + store.pruned, 0), stores, };