fix(tasks): prune stale cron session registry entries

This commit is contained in:
brokemac79
2026-04-29 03:40:04 +01:00
committed by Peter Steinberger
parent e68eb601fa
commit 021f753f72
2 changed files with 271 additions and 4 deletions

View File

@@ -1,3 +1,5 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { RuntimeEnv } from "../runtime.js";
import {
@@ -10,6 +12,7 @@ import {
resetTaskRegistryForTests,
} from "../tasks/task-registry.js";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
import type { OpenClawTestState } from "../test-utils/openclaw-test-state.js";
import { tasksAuditCommand, tasksMaintenanceCommand } from "./tasks.js";
function createRuntime(): RuntimeEnv {
@@ -29,15 +32,17 @@ const zeroTaskAuditCounts = {
stale_running: 0,
};
async function withTaskCommandStateDir(run: () => Promise<void>): Promise<void> {
async function withTaskCommandStateDir(
run: (state: OpenClawTestState) => Promise<void>,
): Promise<void> {
await withOpenClawTestState(
{ layout: "state-only", prefix: "openclaw-tasks-command-" },
async () => {
async (state) => {
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
try {
await run();
await run(state);
} finally {
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests({ persist: false });
@@ -163,4 +168,113 @@ describe("tasks commands", () => {
expect(payload.auditAfter.taskFlows.byCode.stale_running).toBe(0);
});
});
it("applies a conservative session registry sweep for stale cron run sessions", async () => {
await withTaskCommandStateDir(async (state) => {
const now = Date.now();
vi.useFakeTimers();
vi.setSystemTime(now);
const sessionsDir = state.sessionsDir("main");
const storePath = path.join(sessionsDir, "sessions.json");
const old = now - 8 * 24 * 60 * 60_000;
await fs.mkdir(sessionsDir, { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
"agent:main:cron:done-job:run:old-run": {
sessionId: "done-run",
updatedAt: old,
},
"agent:main:cron:running-job:run:old-run": {
sessionId: "running-run",
updatedAt: old,
},
"agent:main:cron:done-job:run:recent-run": {
sessionId: "recent-run",
updatedAt: now - 60_000,
},
"agent:main:telegram:dm:old": {
sessionId: "ordinary-old-session",
updatedAt: old,
},
},
null,
2,
),
"utf-8",
);
await state.writeJson("cron/jobs.json", {
version: 1,
jobs: [
{
id: "running-job",
name: "Running job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
sessionKey: "cron:running-job",
wakeMode: "now",
payload: { kind: "agentTurn", message: "ping" },
delivery: { mode: "none" },
createdAtMs: now,
updatedAtMs: now,
state: {},
},
{
id: "done-job",
name: "Done job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
sessionKey: "cron:done-job",
wakeMode: "now",
payload: { kind: "agentTurn", message: "ping" },
delivery: { mode: "none" },
createdAtMs: now,
updatedAtMs: now,
state: {},
},
],
});
await state.writeJson("cron/jobs-state.json", {
version: 1,
jobs: {
"running-job": {
updatedAtMs: now,
state: { runningAtMs: now - 5_000 },
},
"done-job": {
updatedAtMs: now,
state: {},
},
},
});
const runtime = createRuntime();
await tasksMaintenanceCommand({ json: true, apply: true }, runtime);
const payload = JSON.parse(String(vi.mocked(runtime.log).mock.calls[0]?.[0])) as {
maintenance: {
sessions: {
pruned: number;
runningCronJobs: number;
stores: Array<{ pruned: number; preservedRunning: number }>;
};
};
};
expect(payload.maintenance.sessions.pruned).toBe(1);
expect(payload.maintenance.sessions.runningCronJobs).toBe(1);
expect(payload.maintenance.sessions.stores[0]).toMatchObject({
pruned: 1,
preservedRunning: 1,
});
const updated = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<string, unknown>;
expect(updated["agent:main:cron:done-job:run:old-run"]).toBeUndefined();
expect(updated["agent:main:cron:running-job:run:old-run"]).toBeDefined();
expect(updated["agent:main:cron:done-job:run:recent-run"]).toBeDefined();
expect(updated["agent:main:telegram:dm:old"]).toBeDefined();
});
});
});

View File

@@ -1,7 +1,16 @@
import fs from "node:fs";
import { formatCliCommand } from "../cli/command-format.js";
import { getRuntimeConfig } from "../config/config.js";
import { resolveCronStorePath } from "../cron/store.js";
import {
loadSessionStore,
pruneStaleEntries,
resolveAllAgentSessionStoreTargetsSync,
updateSessionStore,
type SessionEntry,
} from "../config/sessions.js";
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
import type { RuntimeEnv } from "../runtime.js";
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js";
import { cancelDetachedTaskRunById } from "../tasks/task-executor.js";
@@ -44,6 +53,7 @@ const STATUS_PAD = 10;
const DELIVERY_PAD = 14;
const ID_PAD = 10;
const RUN_PAD = 10;
const SESSION_REGISTRY_RETENTION_MS = 7 * 24 * 60 * 60_000;
const info = theme.info;
@@ -62,6 +72,142 @@ function configureTaskMaintenanceFromConfig(): void {
});
}
type SessionRegistryMaintenanceStoreSummary = {
agentId: string;
storePath: string;
beforeCount: number;
afterCount: number;
pruned: number;
preservedRunning: number;
};
type SessionRegistryMaintenanceSummary = {
retentionMs: number;
runningCronJobs: number;
pruned: number;
stores: SessionRegistryMaintenanceStoreSummary[];
};
function parseCronRunSessionJobId(sessionKey: string): string | undefined {
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed) {
return undefined;
}
return /^cron:([^:]+):run:[^:]+$/u.exec(parsed.rest)?.[1];
}
function readRunningCronJobIds(): Set<string> {
try {
const cronStorePath = resolveCronStorePath(getRuntimeConfig().cron?.store);
return new Set(
loadCronStoreSync(cronStorePath)
.jobs.filter((job) => typeof job.state?.runningAtMs === "number")
.map((job) => job.id.toLowerCase()),
);
} catch {
return new Set();
}
}
function buildSessionRegistryPreserveKeys(params: {
store: Record<string, SessionEntry>;
runningCronJobIds: ReadonlySet<string>;
}): { preserveKeys: Set<string>; preservedRunning: number } {
const preserveKeys = new Set<string>();
let preservedRunning = 0;
for (const key of Object.keys(params.store)) {
const jobId = parseCronRunSessionJobId(key);
if (!jobId) {
preserveKeys.add(key);
continue;
}
if (params.runningCronJobIds.has(jobId)) {
preserveKeys.add(key);
preservedRunning += 1;
}
}
return { preserveKeys, preservedRunning };
}
async function runSessionRegistryMaintenance(params: {
apply: boolean;
}): Promise<SessionRegistryMaintenanceSummary> {
const cfg = getRuntimeConfig();
const runningCronJobIds = readRunningCronJobIds();
const stores: SessionRegistryMaintenanceStoreSummary[] = [];
for (const target of resolveAllAgentSessionStoreTargetsSync(cfg)) {
if (!fs.existsSync(target.storePath)) {
stores.push({
agentId: target.agentId,
storePath: target.storePath,
beforeCount: 0,
afterCount: 0,
pruned: 0,
preservedRunning: 0,
});
continue;
}
const beforeStore = loadSessionStore(target.storePath, { skipCache: true });
const beforeCount = Object.keys(beforeStore).length;
const { preservedRunning } = buildSessionRegistryPreserveKeys({
store: beforeStore,
runningCronJobIds,
});
if (params.apply) {
const applied = await updateSessionStore(
target.storePath,
(store) => {
const { preserveKeys } = buildSessionRegistryPreserveKeys({
store,
runningCronJobIds,
});
const pruned = pruneStaleEntries(store, SESSION_REGISTRY_RETENTION_MS, {
log: false,
preserveKeys,
});
return {
pruned,
afterCount: Object.keys(store).length,
};
},
{ skipMaintenance: true },
);
stores.push({
agentId: target.agentId,
storePath: target.storePath,
beforeCount,
afterCount: applied.afterCount,
pruned: applied.pruned,
preservedRunning,
});
continue;
}
const previewStore = structuredClone(beforeStore);
const { preserveKeys } = buildSessionRegistryPreserveKeys({
store: previewStore,
runningCronJobIds,
});
const pruned = pruneStaleEntries(previewStore, SESSION_REGISTRY_RETENTION_MS, {
log: false,
preserveKeys,
});
stores.push({
agentId: target.agentId,
storePath: target.storePath,
beforeCount,
afterCount: Object.keys(previewStore).length,
pruned,
preservedRunning,
});
}
return {
retentionMs: SESSION_REGISTRY_RETENTION_MS,
runningCronJobs: runningCronJobIds.size,
pruned: stores.reduce((total, store) => total + store.pruned, 0),
stores,
};
}
function truncate(value: string, maxChars: number) {
if (value.length <= maxChars) {
return value;
@@ -517,6 +663,7 @@ export async function tasksMaintenanceCommand(
const flowMaintenance = opts.apply
? await runTaskFlowRegistryMaintenance()
: previewTaskFlowRegistryMaintenance();
const sessionMaintenance = await runSessionRegistryMaintenance({ apply: Boolean(opts.apply) });
const summary = getInspectableTaskRegistrySummary();
const auditAfter = opts.apply ? getInspectableTaskAuditSummary() : auditBefore;
const flowAuditAfter = opts.apply ? getInspectableTaskFlowAuditSummary() : flowAuditBefore;
@@ -529,6 +676,7 @@ export async function tasksMaintenanceCommand(
maintenance: {
tasks: taskMaintenance,
taskFlows: flowMaintenance,
sessions: sessionMaintenance,
},
tasks: summary,
auditBefore: {
@@ -552,6 +700,11 @@ export async function tasksMaintenanceCommand(
`Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.recovered} recovered · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`,
),
);
runtime.log(
info(
`Session registry: ${sessionMaintenance.pruned} prune · ${sessionMaintenance.runningCronJobs} running cron jobs`,
),
);
runtime.log(
info(
`${opts.apply ? "Tasks health after apply" : "Tasks health"}: ${summary.byStatus.queued} queued · ${summary.byStatus.running} running · ${auditAfter.errors + flowAuditAfter.errors} audit errors · ${auditAfter.warnings + flowAuditAfter.warnings} audit warnings`,