mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-25 22:09:33 +00:00
fix(tasks): preserve both cron-run session key shapes during maintenance (#96352)
* fix(tasks): preserve both cron-run session key shapes during maintenance
Session-registry maintenance keeps running cron jobs' session rows, but
readRunningCronJobIds built the preserve-set with job.id.toLowerCase() only.
Cron-run session keys carry two job-segment shapes: main-session runs use the
slugified segment (normalizeCronLaneSegment, e.g. "daily-report") while
default-isolated runs use the raw lowercased id ("daily report", built from
cron:${job.id} via toAgentStoreSessionKey, which lowercases but does not
slugify). The lowercase-only matcher preserved isolated runs but pruned
main-session runs of any non-slug job id (e.g. "Daily Report") as stale.
Preserve both shapes (raw lowercased id and slugified segment). This is
strictly more-preserving, so no live running cron session is dropped. Adds a
regression test seeding both a slug main-session run and a raw isolated run for
a non-slug job id, asserting both survive while a non-running job's run is still
pruned.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
* fix(tasks): match cron session keys to target shape
* fix(tasks): preserve active cron aliases across retargeting
* fix(tasks): retain explicit cron session aliases
---------
Co-authored-by: ly-wang19 <ly-wang19@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -370,6 +370,125 @@ describe("tasks commands", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves both cron-run session key shapes for a running non-slug job id", async () => {
|
||||
await withTaskCommandStateDir(async (state) => {
|
||||
const now = Date.now();
|
||||
const old = now - 8 * 24 * 60 * 60_000;
|
||||
await saveCronStore(state.statePath("cron", "jobs.json"), {
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "Daily Report",
|
||||
name: "Daily Report",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
sessionKey: "cron:daily-report",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: "ping" },
|
||||
delivery: { mode: "none" },
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
state: { runningAtMs: now - 5_000 },
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const sessionsDir = state.sessionsDir("main");
|
||||
const storePath = path.join(sessionsDir, "sessions.json");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
// A running job can be retargeted after its session is created, so maintenance must preserve
|
||||
// both the raw and slugged historical shapes.
|
||||
const slugKey = "agent:main:cron:daily-report:run:old-run";
|
||||
const rawKey = "agent:main:cron:daily report:run:old-run";
|
||||
const retiredKey = "agent:main:cron:retired-job:run:old-run";
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[slugKey]: { sessionId: "slug-run", updatedAt: old },
|
||||
[rawKey]: { sessionId: "raw-run", updatedAt: old },
|
||||
[retiredKey]: { sessionId: "retired-run", updatedAt: old },
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const runtime = createRuntime();
|
||||
await tasksMaintenanceCommand({ json: true, apply: true }, runtime);
|
||||
|
||||
const payload = readFirstJsonLog(runtime) as {
|
||||
maintenance: { sessions: { runningCronJobs: number } };
|
||||
};
|
||||
expect(payload.maintenance.sessions.runningCronJobs).toBe(1);
|
||||
const updated = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
|
||||
expect(updated[slugKey]).toBeDefined();
|
||||
expect(updated[rawKey]).toBeDefined();
|
||||
expect(updated[retiredKey]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves a running cron session with an explicit session key", async () => {
|
||||
await withTaskCommandStateDir(async (state) => {
|
||||
const now = Date.now();
|
||||
const old = now - 8 * 24 * 60 * 60_000;
|
||||
await saveCronStore(state.statePath("cron", "jobs.json"), {
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "job-uuid",
|
||||
name: "Daily monitor",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
sessionKey: "cron:daily-monitor",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: "ping" },
|
||||
delivery: { mode: "none" },
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
state: { runningAtMs: now - 5_000 },
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const sessionsDir = state.sessionsDir("main");
|
||||
const storePath = path.join(sessionsDir, "sessions.json");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
"agent:main:cron:daily-monitor:run:old-run": {
|
||||
sessionId: "explicit-run",
|
||||
updatedAt: old,
|
||||
},
|
||||
"agent:main:cron:job-uuid:run:old-run": {
|
||||
sessionId: "job-id-run",
|
||||
updatedAt: old,
|
||||
},
|
||||
"agent:main:cron:retired-job:run:old-run": {
|
||||
sessionId: "retired-run",
|
||||
updatedAt: old,
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const runtime = createRuntime();
|
||||
await tasksMaintenanceCommand({ json: true, apply: true }, runtime);
|
||||
|
||||
const updated = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
|
||||
expect(updated["agent:main:cron:daily-monitor:run:old-run"]).toBeDefined();
|
||||
expect(updated["agent:main:cron:retired-job:run:old-run"]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not build JSON-only diagnostics for text maintenance output", async () => {
|
||||
await withTaskCommandStateDir(async () => {
|
||||
const diagnosticsSpy = vi.spyOn(
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
resolveAllAgentSessionStoreTargetsSync,
|
||||
runSessionRegistryMaintenanceForStore,
|
||||
} from "../config/sessions.js";
|
||||
import { normalizeCronLaneSegment } from "../cron/service/task-runs.js";
|
||||
import { loadCronJobsStoreSync, resolveCronJobsStorePath } from "../cron/store.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js";
|
||||
@@ -128,17 +129,36 @@ type SessionRegistryMaintenanceSummary = {
|
||||
stores: SessionRegistryMaintenanceStoreSummary[];
|
||||
};
|
||||
|
||||
function readRunningCronJobIds(): Set<string> {
|
||||
function resolveExplicitCronSessionSegment(sessionKey: string | undefined): string | undefined {
|
||||
const match = /^(?:agent:[^:]+:)?cron:([^:]+)$/u.exec(sessionKey?.trim() ?? "");
|
||||
return match?.[1]?.toLowerCase();
|
||||
}
|
||||
|
||||
function readRunningCronJobIds(): { ids: Set<string>; count: number } {
|
||||
try {
|
||||
const cronStorePath = resolveCronJobsStorePath(getRuntimeConfig().cron?.store);
|
||||
return new Set(
|
||||
loadCronJobsStoreSync(cronStorePath)
|
||||
.jobs.filter((job) => typeof job.state?.runningAtMs === "number")
|
||||
// Cron session keys are matched case-insensitively against job ids.
|
||||
.map((job) => job.id.toLowerCase()),
|
||||
const runningJobs = loadCronJobsStoreSync(cronStorePath).jobs.filter(
|
||||
(job) => typeof job.state?.runningAtMs === "number",
|
||||
);
|
||||
return {
|
||||
// A running job may have been retargeted after its session was created. Keep both historical
|
||||
// shapes; the registry has no producer metadata, so retaining an ambiguous alias is safer
|
||||
// than pruning a live transcript.
|
||||
ids: new Set(
|
||||
runningJobs.flatMap((job) => [
|
||||
job.id.toLowerCase(),
|
||||
normalizeCronLaneSegment(job.id, "job"),
|
||||
...(job.sessionTarget !== "main" && job.sessionKey
|
||||
? [resolveExplicitCronSessionSegment(job.sessionKey)].filter(
|
||||
(segment): segment is string => segment !== undefined,
|
||||
)
|
||||
: []),
|
||||
]),
|
||||
),
|
||||
count: runningJobs.length,
|
||||
};
|
||||
} catch {
|
||||
return new Set();
|
||||
return { ids: new Set(), count: 0 };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,13 +166,13 @@ async function runSessionRegistryMaintenance(params: {
|
||||
apply: boolean;
|
||||
}): Promise<SessionRegistryMaintenanceSummary> {
|
||||
const cfg = getRuntimeConfig();
|
||||
const runningCronJobIds = readRunningCronJobIds();
|
||||
const runningCronJobs = readRunningCronJobIds();
|
||||
const stores: SessionRegistryMaintenanceStoreSummary[] = [];
|
||||
for (const target of resolveAllAgentSessionStoreTargetsSync(cfg)) {
|
||||
const result = await runSessionRegistryMaintenanceForStore({
|
||||
apply: params.apply,
|
||||
retentionMs: SESSION_REGISTRY_RETENTION_MS,
|
||||
runningCronJobIds,
|
||||
runningCronJobIds: runningCronJobs.ids,
|
||||
storePath: target.storePath,
|
||||
});
|
||||
stores.push({
|
||||
@@ -166,7 +186,7 @@ async function runSessionRegistryMaintenance(params: {
|
||||
}
|
||||
return {
|
||||
retentionMs: SESSION_REGISTRY_RETENTION_MS,
|
||||
runningCronJobs: runningCronJobIds.size,
|
||||
runningCronJobs: runningCronJobs.count,
|
||||
pruned: stores.reduce((total, store) => total + store.pruned, 0),
|
||||
stores,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user