From 021f753f72d6ffd2ff5130bc0b1eefd97883fd34 Mon Sep 17 00:00:00 2001 From: brokemac79 Date: Wed, 29 Apr 2026 03:40:04 +0100 Subject: [PATCH] fix(tasks): prune stale cron session registry entries --- src/commands/tasks.test.ts | 120 +++++++++++++++++++++++++++- src/commands/tasks.ts | 155 ++++++++++++++++++++++++++++++++++++- 2 files changed, 271 insertions(+), 4 deletions(-) diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts index e4033ee4340..ee5f4daa9a2 100644 --- a/src/commands/tasks.test.ts +++ b/src/commands/tasks.test.ts @@ -1,3 +1,5 @@ +import fs from "node:fs/promises"; +import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { RuntimeEnv } from "../runtime.js"; import { @@ -10,6 +12,7 @@ import { resetTaskRegistryForTests, } from "../tasks/task-registry.js"; import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; +import type { OpenClawTestState } from "../test-utils/openclaw-test-state.js"; import { tasksAuditCommand, tasksMaintenanceCommand } from "./tasks.js"; function createRuntime(): RuntimeEnv { @@ -29,15 +32,17 @@ const zeroTaskAuditCounts = { stale_running: 0, }; -async function withTaskCommandStateDir(run: () => Promise): Promise { +async function withTaskCommandStateDir( + run: (state: OpenClawTestState) => Promise, +): Promise { await withOpenClawTestState( { layout: "state-only", prefix: "openclaw-tasks-command-" }, - async () => { + async (state) => { resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests({ persist: false }); resetTaskFlowRegistryForTests({ persist: false }); try { - await run(); + await run(state); } finally { resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests({ persist: false }); @@ -163,4 +168,113 @@ describe("tasks commands", () => { expect(payload.auditAfter.taskFlows.byCode.stale_running).toBe(0); }); }); + + it("applies a conservative session registry sweep for stale cron run sessions", async () => { + await withTaskCommandStateDir(async (state) => { + const now = Date.now(); + vi.useFakeTimers(); + vi.setSystemTime(now); + const sessionsDir = state.sessionsDir("main"); + const storePath = path.join(sessionsDir, "sessions.json"); + const old = now - 8 * 24 * 60 * 60_000; + await fs.mkdir(sessionsDir, { recursive: true }); + await fs.writeFile( + storePath, + JSON.stringify( + { + "agent:main:cron:done-job:run:old-run": { + sessionId: "done-run", + updatedAt: old, + }, + "agent:main:cron:running-job:run:old-run": { + sessionId: "running-run", + updatedAt: old, + }, + "agent:main:cron:done-job:run:recent-run": { + sessionId: "recent-run", + updatedAt: now - 60_000, + }, + "agent:main:telegram:dm:old": { + sessionId: "ordinary-old-session", + updatedAt: old, + }, + }, + null, + 2, + ), + "utf-8", + ); + await state.writeJson("cron/jobs.json", { + version: 1, + jobs: [ + { + id: "running-job", + name: "Running job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + sessionKey: "cron:running-job", + wakeMode: "now", + payload: { kind: "agentTurn", message: "ping" }, + delivery: { mode: "none" }, + createdAtMs: now, + updatedAtMs: now, + state: {}, + }, + { + id: "done-job", + name: "Done job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + sessionKey: "cron:done-job", + wakeMode: "now", + payload: { kind: "agentTurn", message: "ping" }, + delivery: { mode: "none" }, + createdAtMs: now, + updatedAtMs: now, + state: {}, + }, + ], + }); + await state.writeJson("cron/jobs-state.json", { + version: 1, + jobs: { + "running-job": { + updatedAtMs: now, + state: { runningAtMs: now - 5_000 }, + }, + "done-job": { + updatedAtMs: now, + state: {}, + }, + }, + }); + + const runtime = createRuntime(); + await tasksMaintenanceCommand({ json: true, apply: true }, runtime); + + const payload = JSON.parse(String(vi.mocked(runtime.log).mock.calls[0]?.[0])) as { + maintenance: { + sessions: { + pruned: number; + runningCronJobs: number; + stores: Array<{ pruned: number; preservedRunning: number }>; + }; + }; + }; + expect(payload.maintenance.sessions.pruned).toBe(1); + expect(payload.maintenance.sessions.runningCronJobs).toBe(1); + expect(payload.maintenance.sessions.stores[0]).toMatchObject({ + pruned: 1, + preservedRunning: 1, + }); + + const updated = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record; + expect(updated["agent:main:cron:done-job:run:old-run"]).toBeUndefined(); + expect(updated["agent:main:cron:running-job:run:old-run"]).toBeDefined(); + expect(updated["agent:main:cron:done-job:run:recent-run"]).toBeDefined(); + expect(updated["agent:main:telegram:dm:old"]).toBeDefined(); + }); + }); }); diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 0afe94d7abd..83ccc37512a 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -1,7 +1,16 @@ +import fs from "node:fs"; import { formatCliCommand } from "../cli/command-format.js"; import { getRuntimeConfig } from "../config/config.js"; -import { resolveCronStorePath } from "../cron/store.js"; +import { + loadSessionStore, + pruneStaleEntries, + resolveAllAgentSessionStoreTargetsSync, + updateSessionStore, + type SessionEntry, +} from "../config/sessions.js"; +import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js"; import type { RuntimeEnv } from "../runtime.js"; +import { parseAgentSessionKey } from "../sessions/session-key-utils.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js"; import { cancelDetachedTaskRunById } from "../tasks/task-executor.js"; @@ -44,6 +53,7 @@ const STATUS_PAD = 10; const DELIVERY_PAD = 14; const ID_PAD = 10; const RUN_PAD = 10; +const SESSION_REGISTRY_RETENTION_MS = 7 * 24 * 60 * 60_000; const info = theme.info; @@ -62,6 +72,142 @@ function configureTaskMaintenanceFromConfig(): void { }); } +type SessionRegistryMaintenanceStoreSummary = { + agentId: string; + storePath: string; + beforeCount: number; + afterCount: number; + pruned: number; + preservedRunning: number; +}; + +type SessionRegistryMaintenanceSummary = { + retentionMs: number; + runningCronJobs: number; + pruned: number; + stores: SessionRegistryMaintenanceStoreSummary[]; +}; + +function parseCronRunSessionJobId(sessionKey: string): string | undefined { + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed) { + return undefined; + } + return /^cron:([^:]+):run:[^:]+$/u.exec(parsed.rest)?.[1]; +} + +function readRunningCronJobIds(): Set { + try { + const cronStorePath = resolveCronStorePath(getRuntimeConfig().cron?.store); + return new Set( + loadCronStoreSync(cronStorePath) + .jobs.filter((job) => typeof job.state?.runningAtMs === "number") + .map((job) => job.id.toLowerCase()), + ); + } catch { + return new Set(); + } +} + +function buildSessionRegistryPreserveKeys(params: { + store: Record; + runningCronJobIds: ReadonlySet; +}): { preserveKeys: Set; preservedRunning: number } { + const preserveKeys = new Set(); + let preservedRunning = 0; + for (const key of Object.keys(params.store)) { + const jobId = parseCronRunSessionJobId(key); + if (!jobId) { + preserveKeys.add(key); + continue; + } + if (params.runningCronJobIds.has(jobId)) { + preserveKeys.add(key); + preservedRunning += 1; + } + } + return { preserveKeys, preservedRunning }; +} + +async function runSessionRegistryMaintenance(params: { + apply: boolean; +}): Promise { + const cfg = getRuntimeConfig(); + const runningCronJobIds = readRunningCronJobIds(); + const stores: SessionRegistryMaintenanceStoreSummary[] = []; + for (const target of resolveAllAgentSessionStoreTargetsSync(cfg)) { + if (!fs.existsSync(target.storePath)) { + stores.push({ + agentId: target.agentId, + storePath: target.storePath, + beforeCount: 0, + afterCount: 0, + pruned: 0, + preservedRunning: 0, + }); + continue; + } + const beforeStore = loadSessionStore(target.storePath, { skipCache: true }); + const beforeCount = Object.keys(beforeStore).length; + const { preservedRunning } = buildSessionRegistryPreserveKeys({ + store: beforeStore, + runningCronJobIds, + }); + if (params.apply) { + const applied = await updateSessionStore( + target.storePath, + (store) => { + const { preserveKeys } = buildSessionRegistryPreserveKeys({ + store, + runningCronJobIds, + }); + const pruned = pruneStaleEntries(store, SESSION_REGISTRY_RETENTION_MS, { + log: false, + preserveKeys, + }); + return { + pruned, + afterCount: Object.keys(store).length, + }; + }, + { skipMaintenance: true }, + ); + stores.push({ + agentId: target.agentId, + storePath: target.storePath, + beforeCount, + afterCount: applied.afterCount, + pruned: applied.pruned, + preservedRunning, + }); + continue; + } + const previewStore = structuredClone(beforeStore); + const { preserveKeys } = buildSessionRegistryPreserveKeys({ + store: previewStore, + runningCronJobIds, + }); + const pruned = pruneStaleEntries(previewStore, SESSION_REGISTRY_RETENTION_MS, { + log: false, + preserveKeys, + }); + stores.push({ + agentId: target.agentId, + storePath: target.storePath, + beforeCount, + afterCount: Object.keys(previewStore).length, + pruned, + preservedRunning, + }); + } + return { + retentionMs: SESSION_REGISTRY_RETENTION_MS, + runningCronJobs: runningCronJobIds.size, + pruned: stores.reduce((total, store) => total + store.pruned, 0), + stores, + }; +} + function truncate(value: string, maxChars: number) { if (value.length <= maxChars) { return value; @@ -517,6 +663,7 @@ export async function tasksMaintenanceCommand( const flowMaintenance = opts.apply ? await runTaskFlowRegistryMaintenance() : previewTaskFlowRegistryMaintenance(); + const sessionMaintenance = await runSessionRegistryMaintenance({ apply: Boolean(opts.apply) }); const summary = getInspectableTaskRegistrySummary(); const auditAfter = opts.apply ? getInspectableTaskAuditSummary() : auditBefore; const flowAuditAfter = opts.apply ? getInspectableTaskFlowAuditSummary() : flowAuditBefore; @@ -529,6 +676,7 @@ export async function tasksMaintenanceCommand( maintenance: { tasks: taskMaintenance, taskFlows: flowMaintenance, + sessions: sessionMaintenance, }, tasks: summary, auditBefore: { @@ -552,6 +700,11 @@ export async function tasksMaintenanceCommand( `Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.recovered} recovered · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`, ), ); + runtime.log( + info( + `Session registry: ${sessionMaintenance.pruned} prune · ${sessionMaintenance.runningCronJobs} running cron jobs`, + ), + ); runtime.log( info( `${opts.apply ? "Tasks health after apply" : "Tasks health"}: ${summary.byStatus.queued} queued · ${summary.byStatus.running} running · ${auditAfter.errors + flowAuditAfter.errors} audit errors · ${auditAfter.warnings + flowAuditAfter.warnings} audit warnings`,