mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix: recover stale cron task records
This commit is contained in:
@@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Installer: load nvm before Node.js detection so `curl | bash` installs respect nvm-managed Node instead of stale system Node. Fixes #49556. Thanks @heavenlxj.
|
||||
- CLI/Volta: respawn raw `openclaw` CLI runs through the named `node` shim when the current Node executable resolves to `volta-shim`, avoiding direct shim execution failures in non-interactive shells. Fixes #68672. Thanks @sanchezm86.
|
||||
- Installer: warn when multiple npm global roots contain OpenClaw installs, showing active Node/npm/openclaw plus each install path and version so stale version-manager installs are visible. Fixes #40839. Thanks @zhixianio.
|
||||
- Cron/tasks: recover completed cron task ledger records from durable run logs and job state before marking them `lost`, reducing false `backing session missing` audit errors for isolated cron runs and keeping offline CLI audit from treating its empty local cron active-job set as authoritative. Fixes #71963.
|
||||
- Docker: copy patched dependency files into runtime images so downstream `pnpm install` layers keep working. Fixes #69224. Thanks @gucasbrg.
|
||||
- Agents/runtime: submit heartbeat, cron, and exec wakeups as transient runtime context instead of visible user prompts, keeping synthetic system work out of chat transcripts. Fixes #66496 and #66814. Thanks @jeades and @mandomaker.
|
||||
- Telegram: include native quote excerpts automatically for threaded replies and reply tags when the original Telegram text is available, without adding another config knob. Fixes #6975. Thanks @rex05ai.
|
||||
|
||||
@@ -51,7 +51,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
|
||||
<a id="maintenance"></a>
|
||||
|
||||
<Note>
|
||||
Task reconciliation for cron is runtime-owned: an active cron task stays live while the cron runtime still tracks that job as running, even if an old child session row still exists. Once the runtime stops owning the job and the 5-minute grace window expires, maintenance can mark the task `lost`.
|
||||
Task reconciliation for cron is runtime-owned first, durable-history-backed second: an active cron task stays live while the cron runtime still tracks that job as running, even if an old child session row still exists. Once the runtime stops owning the job and the 5-minute grace window expires, maintenance checks persisted run logs and job state for the matching `cron:<jobId>:<startedAt>` run. If that durable history shows a terminal result, the task ledger is finalized from it; otherwise Gateway-owned maintenance can mark the task `lost`. Offline CLI audit can recover from durable history, but it does not treat its own empty in-process active-job set as proof that a Gateway-owned cron run is gone.
|
||||
</Note>
|
||||
|
||||
## Schedule types
|
||||
|
||||
@@ -25,8 +25,12 @@ Not every agent run creates a task. Heartbeat turns and normal interactive chat
|
||||
- Tasks are **records**, not schedulers — cron and heartbeat decide _when_ work runs, tasks track _what happened_.
|
||||
- ACP, subagents, all cron jobs, and CLI operations create tasks. Heartbeat turns do not.
|
||||
- Each task moves through `queued → running → terminal` (succeeded, failed, timed_out, cancelled, or lost).
|
||||
- Cron tasks stay live while the cron runtime still owns the job; chat-backed CLI tasks stay live only while their owning run context is still active.
|
||||
- Completion is push-driven: detached work can notify directly or wake the requester session/heartbeat when it finishes, so status polling loops are usually the wrong shape.
|
||||
- Cron tasks stay live while the cron runtime still owns the job; if the
|
||||
in-memory runtime state is gone, task maintenance first checks durable cron
|
||||
run history before marking a task lost.
|
||||
- Completion is push-driven: detached work can notify directly or wake the
|
||||
requester session/heartbeat when it finishes, so status polling loops are
|
||||
usually the wrong shape.
|
||||
- Isolated cron runs and subagent completions best-effort clean up tracked browser tabs/processes for their child session before final cleanup bookkeeping.
|
||||
- Isolated cron delivery suppresses stale interim parent replies while descendant subagent work is still draining, and it prefers final descendant output when that arrives before delivery.
|
||||
- Completion notifications are delivered directly to a channel or queued for the next heartbeat.
|
||||
@@ -143,8 +147,14 @@ Agent run completion is authoritative for active task records. A successful deta
|
||||
|
||||
- ACP tasks: backing ACP child session metadata disappeared.
|
||||
- Subagent tasks: backing child session disappeared from the target agent store.
|
||||
- Cron tasks: the cron runtime no longer tracks the job as active.
|
||||
- CLI tasks: isolated child-session tasks use the child session; chat-backed CLI tasks use the live run context instead, so lingering channel/group/direct session rows do not keep them alive. Gateway-backed `openclaw agent` runs also finalize from their run result, so completed runs do not sit active until the sweeper marks them `lost`.
|
||||
- Cron tasks: the cron runtime no longer tracks the job as active and durable
|
||||
cron run history does not show a terminal result for that run. Offline CLI
|
||||
audit does not treat its own empty in-process cron runtime state as authority.
|
||||
- CLI tasks: isolated child-session tasks use the child session; chat-backed
|
||||
CLI tasks use the live run context instead, so lingering
|
||||
channel/group/direct session rows do not keep them alive. Gateway-backed
|
||||
`openclaw agent` runs also finalize from their run result, so completed runs
|
||||
do not sit active until the sweeper marks them `lost`.
|
||||
|
||||
## Delivery and notifications
|
||||
|
||||
@@ -236,7 +246,7 @@ openclaw tasks notify <lookup> state_changes
|
||||
Reconciliation is runtime-aware:
|
||||
|
||||
- ACP/subagent tasks check their backing child session.
|
||||
- Cron tasks check whether the cron runtime still owns the job.
|
||||
- Cron tasks check whether the cron runtime still owns the job, then recover terminal status from persisted cron run logs/job state before falling back to `lost`. Only the Gateway process is authoritative for the in-memory cron active-job set; offline CLI audit uses durable history but does not mark a cron task lost solely because that local Set is empty.
|
||||
- Chat-backed CLI tasks check the owning live run context, not just the chat session row.
|
||||
|
||||
Completion cleanup is also runtime-aware:
|
||||
|
||||
@@ -84,6 +84,10 @@ openclaw tasks maintenance [--apply] [--json]
|
||||
```
|
||||
|
||||
Previews or applies task and Task Flow reconciliation, cleanup stamping, and pruning.
|
||||
For cron tasks, reconciliation uses persisted run logs/job state before marking an
|
||||
old active task `lost`, so completed cron runs do not become false audit errors
|
||||
just because the in-memory Gateway runtime state is gone. Offline CLI audit is
|
||||
not authoritative for the Gateway's process-local cron active-job set.
|
||||
|
||||
### `flow`
|
||||
|
||||
|
||||
@@ -58,6 +58,7 @@ vi.mock("../infra/system-events.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../tasks/task-registry.maintenance.js", () => ({
|
||||
configureTaskRegistryMaintenance: vi.fn(),
|
||||
getInspectableTaskRegistrySummary: vi.fn(() => ({
|
||||
total: 0,
|
||||
active: 0,
|
||||
|
||||
@@ -4,6 +4,7 @@ import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import { readSessionStoreReadOnly } from "../config/sessions/store-read.js";
|
||||
import { resolveSessionTotalTokens, type SessionEntry } from "../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../config/types.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import { listGatewayAgentsBasic } from "../gateway/agent-list.js";
|
||||
import { resolveHeartbeatSummaryForAgent } from "../infra/heartbeat-summary.js";
|
||||
import { peekSystemEvents } from "../infra/system-events.js";
|
||||
@@ -151,6 +152,9 @@ export async function getStatusSummary(
|
||||
const mainSessionKey = resolveMainSessionKey(cfg);
|
||||
const queuedSystemEvents = peekSystemEvents(mainSessionKey);
|
||||
const taskMaintenanceModule = await loadTaskRegistryMaintenanceModule();
|
||||
taskMaintenanceModule.configureTaskRegistryMaintenance({
|
||||
cronStorePath: resolveCronStorePath(cfg.cron?.store),
|
||||
});
|
||||
const tasks = taskMaintenanceModule.getInspectableTaskRegistrySummary();
|
||||
const taskAudit = taskMaintenanceModule.getInspectableTaskAuditSummary();
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { normalizeOptionalString } from "../shared/string-coerce.js";
|
||||
import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js";
|
||||
@@ -24,6 +26,7 @@ import { compareTaskAuditFindingSortKeys } from "../tasks/task-registry.audit.sh
|
||||
import {
|
||||
getInspectableTaskAuditSummary,
|
||||
getInspectableTaskRegistrySummary,
|
||||
configureTaskRegistryMaintenance,
|
||||
previewTaskRegistryMaintenance,
|
||||
runTaskRegistryMaintenance,
|
||||
} from "../tasks/task-registry.maintenance.js";
|
||||
@@ -44,10 +47,16 @@ const RUN_PAD = 10;
|
||||
const info = theme.info;
|
||||
|
||||
async function loadTaskCancelConfig() {
|
||||
const { loadConfig } = await import("../config/config.js");
|
||||
return loadConfig();
|
||||
}
|
||||
|
||||
function configureTaskMaintenanceFromConfig(): void {
|
||||
const cfg = loadConfig();
|
||||
configureTaskRegistryMaintenance({
|
||||
cronStorePath: resolveCronStorePath(cfg.cron?.store),
|
||||
});
|
||||
}
|
||||
|
||||
function truncate(value: string, maxChars: number) {
|
||||
if (value.length <= maxChars) {
|
||||
return value;
|
||||
@@ -417,6 +426,7 @@ export async function tasksAuditCommand(
|
||||
},
|
||||
runtime: RuntimeEnv,
|
||||
) {
|
||||
configureTaskMaintenanceFromConfig();
|
||||
const severityFilter = opts.severity?.trim() as TaskSystemAuditSeverity | undefined;
|
||||
const codeFilter = opts.code?.trim() as TaskSystemAuditCode | undefined;
|
||||
const { allFindings, filteredFindings, taskFindings, summary } = toSystemAuditFindings({
|
||||
@@ -491,6 +501,7 @@ export async function tasksMaintenanceCommand(
|
||||
opts: { json?: boolean; apply?: boolean },
|
||||
runtime: RuntimeEnv,
|
||||
) {
|
||||
configureTaskMaintenanceFromConfig();
|
||||
const auditBefore = getInspectableTaskAuditSummary();
|
||||
const flowAuditBefore = getInspectableTaskFlowAuditSummary();
|
||||
const taskMaintenance = opts.apply
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
getPendingCronRunLogWriteCountForTests,
|
||||
readCronRunLogEntries,
|
||||
readCronRunLogEntriesPage,
|
||||
readCronRunLogEntriesSync,
|
||||
resolveCronRunLogPruneOptions,
|
||||
resolveCronRunLogPath,
|
||||
} from "./run-log.js";
|
||||
@@ -96,6 +97,36 @@ describe("cron run log", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("reads run-log entries synchronously for task reconciliation", async () => {
|
||||
await withRunLogDir("openclaw-cron-log-sync-", async (dir) => {
|
||||
const logPath = path.join(dir, "runs", "job-1.jsonl");
|
||||
await appendCronRunLog(logPath, {
|
||||
ts: 1000,
|
||||
jobId: "job-1",
|
||||
action: "finished",
|
||||
status: "ok",
|
||||
runAtMs: 900,
|
||||
durationMs: 100,
|
||||
});
|
||||
await appendCronRunLog(logPath, {
|
||||
ts: 2000,
|
||||
jobId: "job-2",
|
||||
action: "finished",
|
||||
status: "error",
|
||||
});
|
||||
|
||||
expect(readCronRunLogEntriesSync(logPath, { jobId: "job-1" })).toEqual([
|
||||
expect.objectContaining({
|
||||
jobId: "job-1",
|
||||
status: "ok",
|
||||
runAtMs: 900,
|
||||
durationMs: 100,
|
||||
}),
|
||||
]);
|
||||
expect(readCronRunLogEntriesSync(path.join(dir, "runs", "missing.jsonl"))).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
it.skipIf(process.platform === "win32")(
|
||||
"writes run log files with secure permissions",
|
||||
async () => {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { randomBytes } from "node:crypto";
|
||||
import fsSync from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { parseByteSize } from "../cli/parse-bytes.js";
|
||||
@@ -198,6 +199,23 @@ export async function readCronRunLogEntries(
|
||||
return page.entries.toReversed();
|
||||
}
|
||||
|
||||
export function readCronRunLogEntriesSync(
|
||||
filePath: string,
|
||||
opts?: { limit?: number; jobId?: string },
|
||||
): CronRunLogEntry[] {
|
||||
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
|
||||
let raw: string;
|
||||
try {
|
||||
raw = fsSync.readFileSync(path.resolve(filePath), "utf-8");
|
||||
} catch (error) {
|
||||
if (typeof error === "object" && error !== null && "code" in error && error.code === "ENOENT") {
|
||||
return [];
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
return parseAllRunLogEntries(raw, { jobId: opts?.jobId }).slice(-limit);
|
||||
}
|
||||
|
||||
function normalizeRunStatusFilter(status?: string): CronRunLogStatusFilter {
|
||||
if (status === "ok" || status === "error" || status === "skipped" || status === "all") {
|
||||
return status;
|
||||
|
||||
@@ -3,7 +3,7 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { setTimeout as scheduleNativeTimeout } from "node:timers";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { loadCronStore, resolveCronStorePath, saveCronStore } from "./store.js";
|
||||
import { loadCronStore, loadCronStoreSync, resolveCronStorePath, saveCronStore } from "./store.js";
|
||||
import type { CronStoreFile } from "./types.js";
|
||||
|
||||
let fixtureRoot = "";
|
||||
@@ -125,6 +125,19 @@ describe("cron store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("loads split cron state synchronously for task reconciliation", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
await saveCronStore(storePath, makeStore("job-sync", true));
|
||||
|
||||
const loaded = loadCronStoreSync(storePath);
|
||||
|
||||
expect(loaded.jobs[0]).toMatchObject({
|
||||
id: "job-sync",
|
||||
state: expect.any(Object),
|
||||
updatedAtMs: expect.any(Number),
|
||||
});
|
||||
});
|
||||
|
||||
it("does not create a backup file when saving unchanged content", async () => {
|
||||
const store = await makeStorePath();
|
||||
const payload = makeStore("job-1", true);
|
||||
|
||||
@@ -114,6 +114,39 @@ async function loadStateFile(statePath: string): Promise<CronStateFile | null> {
|
||||
}
|
||||
}
|
||||
|
||||
function loadStateFileSync(statePath: string): CronStateFile | null {
|
||||
let raw: string;
|
||||
try {
|
||||
raw = fs.readFileSync(statePath, "utf-8");
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, {
|
||||
cause: err,
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = parseJsonWithJson5Fallback(raw);
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const record = parsed as Record<string, unknown>;
|
||||
if (
|
||||
record.version !== 1 ||
|
||||
typeof record.jobs !== "object" ||
|
||||
record.jobs === null ||
|
||||
Array.isArray(record.jobs)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return { version: 1, jobs: record.jobs as Record<string, CronStateFileEntry> };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function hasInlineState(jobs: Array<Record<string, unknown> | null | undefined>): boolean {
|
||||
return jobs.some(
|
||||
(job) =>
|
||||
@@ -219,6 +252,60 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
}
|
||||
}
|
||||
|
||||
export function loadCronStoreSync(storePath: string): CronStoreFile {
|
||||
try {
|
||||
const raw = fs.readFileSync(storePath, "utf-8");
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = parseJsonWithJson5Fallback(raw);
|
||||
} catch (err) {
|
||||
throw new Error(`Failed to parse cron store at ${storePath}: ${String(err)}`, {
|
||||
cause: err,
|
||||
});
|
||||
}
|
||||
const parsedRecord =
|
||||
parsed && typeof parsed === "object" && !Array.isArray(parsed)
|
||||
? (parsed as Record<string, unknown>)
|
||||
: {};
|
||||
const jobs = Array.isArray(parsedRecord.jobs) ? (parsedRecord.jobs as never[]) : [];
|
||||
const store = {
|
||||
version: 1 as const,
|
||||
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
|
||||
};
|
||||
|
||||
const stateFile = loadStateFileSync(resolveStatePath(storePath));
|
||||
const hasLegacyInlineState =
|
||||
!stateFile && hasInlineState(jobs as unknown as Array<Record<string, unknown>>);
|
||||
|
||||
if (stateFile) {
|
||||
for (const job of store.jobs) {
|
||||
const entry = stateFile.jobs[job.id];
|
||||
if (entry) {
|
||||
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
|
||||
job.state = (entry.state ?? {}) as never;
|
||||
} else {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
}
|
||||
} else if (!hasLegacyInlineState) {
|
||||
for (const job of store.jobs) {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
}
|
||||
|
||||
for (const job of store.jobs) {
|
||||
ensureJobStateObject(job);
|
||||
}
|
||||
|
||||
return store;
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
return { version: 1, jobs: [] };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
type SaveCronStoreOptions = {
|
||||
skipBackup?: boolean;
|
||||
};
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { registerSkillsChangeListener } from "../agents/skills/refresh.js";
|
||||
import type { GatewayTailscaleMode } from "../config/types.gateway.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import { getMachineDisplayName } from "../infra/machine-name.js";
|
||||
import {
|
||||
primeRemoteSkillsCache,
|
||||
@@ -8,7 +9,10 @@ import {
|
||||
setSkillsRemoteRegistry,
|
||||
} from "../infra/skills-remote.js";
|
||||
import type { PluginRegistry } from "../plugins/registry-types.js";
|
||||
import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js";
|
||||
import {
|
||||
configureTaskRegistryMaintenance,
|
||||
startTaskRegistryMaintenance,
|
||||
} from "../tasks/task-registry.maintenance.js";
|
||||
import { startGatewayDiscovery } from "./server-discovery-runtime.js";
|
||||
import { startGatewayMaintenanceTimers } from "./server-maintenance.js";
|
||||
|
||||
@@ -77,6 +81,10 @@ export async function startGatewayEarlyRuntime(params: {
|
||||
if (!params.minimalTestGateway) {
|
||||
setSkillsRemoteRegistry(params.nodeRegistry);
|
||||
void primeRemoteSkillsCache();
|
||||
configureTaskRegistryMaintenance({
|
||||
cronStorePath: resolveCronStorePath(params.cfgAtStart.cron?.store),
|
||||
cronRuntimeAuthoritative: true,
|
||||
});
|
||||
startTaskRegistryMaintenance();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js";
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
import type { CronRunLogEntry } from "../cron/run-log.js";
|
||||
import type { CronStoreFile } from "../cron/types.js";
|
||||
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
|
||||
import {
|
||||
resetDetachedTaskLifecycleRuntimeForTests,
|
||||
@@ -9,6 +11,7 @@ import {
|
||||
} from "./detached-task-runtime.js";
|
||||
import {
|
||||
previewTaskRegistryMaintenance,
|
||||
reconcileInspectableTasks,
|
||||
resetTaskRegistryMaintenanceRuntimeForTests,
|
||||
runTaskRegistryMaintenance,
|
||||
setTaskRegistryMaintenanceRuntimeForTests,
|
||||
@@ -53,11 +56,15 @@ function createTaskRegistryMaintenanceHarness(params: {
|
||||
acpEntry?: AcpSessionStoreEntry["entry"];
|
||||
activeCronJobIds?: string[];
|
||||
activeRunIds?: string[];
|
||||
cronStore?: CronStoreFile;
|
||||
cronRunLogEntries?: Record<string, CronRunLogEntry[]>;
|
||||
cronRuntimeAuthoritative?: boolean;
|
||||
}) {
|
||||
const sessionStore = params.sessionStore ?? {};
|
||||
const acpEntry = params.acpEntry;
|
||||
const activeCronJobIds = new Set(params.activeCronJobIds ?? []);
|
||||
const activeRunIds = new Set(params.activeRunIds ?? []);
|
||||
const cronRunLogEntries = params.cronRunLogEntries ?? {};
|
||||
const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }]));
|
||||
|
||||
const runtime: TaskRegistryMaintenanceRuntime = {
|
||||
@@ -113,6 +120,24 @@ function createTaskRegistryMaintenanceHarness(params: {
|
||||
currentTasks.set(patch.taskId, next);
|
||||
return next;
|
||||
},
|
||||
markTaskTerminalById: (patch) => {
|
||||
const current = currentTasks.get(patch.taskId);
|
||||
if (!current) {
|
||||
return null;
|
||||
}
|
||||
const next = {
|
||||
...current,
|
||||
status: patch.status,
|
||||
endedAt: patch.endedAt,
|
||||
lastEventAt: patch.lastEventAt ?? patch.endedAt,
|
||||
...(patch.error !== undefined ? { error: patch.error } : {}),
|
||||
...(patch.terminalSummary !== undefined
|
||||
? { terminalSummary: patch.terminalSummary ?? undefined }
|
||||
: {}),
|
||||
} satisfies TaskRecord;
|
||||
currentTasks.set(patch.taskId, next);
|
||||
return next;
|
||||
},
|
||||
maybeDeliverTaskTerminalUpdate: async () => null,
|
||||
resolveTaskForLookupToken: () => undefined,
|
||||
setTaskCleanupAfterById: (patch) => {
|
||||
@@ -124,6 +149,11 @@ function createTaskRegistryMaintenanceHarness(params: {
|
||||
currentTasks.set(patch.taskId, next);
|
||||
return next;
|
||||
},
|
||||
isCronRuntimeAuthoritative: () => params.cronRuntimeAuthoritative ?? true,
|
||||
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
|
||||
loadCronStoreSync: () => params.cronStore ?? { version: 1, jobs: [] },
|
||||
resolveCronRunLogPath: ({ jobId }) => jobId,
|
||||
readCronRunLogEntriesSync: (jobId) => cronRunLogEntries[jobId] ?? [],
|
||||
};
|
||||
|
||||
setTaskRegistryMaintenanceRuntimeForTests(runtime);
|
||||
@@ -164,6 +194,112 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
|
||||
});
|
||||
|
||||
it("does not mark cron tasks lost when the current process is not the cron runtime authority", async () => {
|
||||
const task = makeStaleTask({
|
||||
runtime: "cron",
|
||||
sourceId: "cron-job-offline-audit",
|
||||
childSessionKey: undefined,
|
||||
});
|
||||
|
||||
const { currentTasks } = createTaskRegistryMaintenanceHarness({
|
||||
tasks: [task],
|
||||
cronRuntimeAuthoritative: false,
|
||||
});
|
||||
|
||||
expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
|
||||
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
|
||||
});
|
||||
|
||||
it("recovers finished cron tasks from durable run logs before marking them lost", async () => {
|
||||
const startedAt = Date.now() - GRACE_EXPIRED_MS;
|
||||
const task = makeStaleTask({
|
||||
runtime: "cron",
|
||||
sourceId: "cron-job-run-log-ok",
|
||||
runId: `cron:cron-job-run-log-ok:${startedAt}`,
|
||||
startedAt,
|
||||
lastEventAt: startedAt,
|
||||
});
|
||||
|
||||
const { currentTasks } = createTaskRegistryMaintenanceHarness({
|
||||
tasks: [task],
|
||||
cronRunLogEntries: {
|
||||
"cron-job-run-log-ok": [
|
||||
{
|
||||
ts: startedAt + 1250,
|
||||
jobId: "cron-job-run-log-ok",
|
||||
action: "finished",
|
||||
status: "ok",
|
||||
summary: "done",
|
||||
runAtMs: startedAt,
|
||||
durationMs: 1250,
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
||||
expect(reconcileInspectableTasks()).toEqual([
|
||||
expect.objectContaining({
|
||||
taskId: task.taskId,
|
||||
status: "succeeded",
|
||||
endedAt: startedAt + 1250,
|
||||
terminalSummary: "done",
|
||||
}),
|
||||
]);
|
||||
expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 });
|
||||
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 });
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({
|
||||
status: "succeeded",
|
||||
endedAt: startedAt + 1250,
|
||||
terminalSummary: "done",
|
||||
});
|
||||
});
|
||||
|
||||
it("recovers interrupted cron tasks from durable cron job state when run logs are absent", async () => {
|
||||
const startedAt = Date.now() - GRACE_EXPIRED_MS;
|
||||
const task = makeStaleTask({
|
||||
runtime: "cron",
|
||||
sourceId: "cron-job-state-error",
|
||||
runId: `cron:cron-job-state-error:${startedAt}`,
|
||||
startedAt,
|
||||
lastEventAt: startedAt,
|
||||
});
|
||||
|
||||
const { currentTasks } = createTaskRegistryMaintenanceHarness({
|
||||
tasks: [task],
|
||||
cronStore: {
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "cron-job-state-error",
|
||||
name: "state error",
|
||||
enabled: true,
|
||||
createdAtMs: startedAt - 60_000,
|
||||
updatedAtMs: startedAt,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: startedAt - 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "work" },
|
||||
state: {
|
||||
lastRunAtMs: startedAt,
|
||||
lastRunStatus: "error",
|
||||
lastError: "cron: job interrupted by gateway restart",
|
||||
lastDurationMs: 5000,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
||||
expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 });
|
||||
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0, recovered: 1 });
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({
|
||||
status: "failed",
|
||||
endedAt: startedAt + 5000,
|
||||
error: "cron: job interrupted by gateway restart",
|
||||
});
|
||||
});
|
||||
|
||||
it("marks chat-backed cli tasks lost after the owning run context disappears", async () => {
|
||||
const channelKey = "agent:main:workspace:channel:C1234567890";
|
||||
const task = makeStaleTask({
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
|
||||
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import { isCronJobActive } from "../cron/active-jobs.js";
|
||||
import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js";
|
||||
import type { CronRunLogEntry } from "../cron/run-log.js";
|
||||
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
|
||||
import type { CronJob, CronStoreFile } from "../cron/types.js";
|
||||
import { getAgentRunContext } from "../infra/agent-events.js";
|
||||
import { parseAgentSessionKey } from "../routing/session-key.js";
|
||||
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
|
||||
@@ -12,6 +16,7 @@ import {
|
||||
getTaskById,
|
||||
listTaskRecords,
|
||||
markTaskLostById,
|
||||
markTaskTerminalById,
|
||||
maybeDeliverTaskTerminalUpdate,
|
||||
resolveTaskForLookupToken,
|
||||
setTaskCleanupAfterById,
|
||||
@@ -23,7 +28,7 @@ import {
|
||||
} from "./task-registry.audit.js";
|
||||
import type { TaskAuditSummary } from "./task-registry.audit.js";
|
||||
import { summarizeTaskRecords } from "./task-registry.summary.js";
|
||||
import type { TaskRecord, TaskRegistrySummary } from "./task-registry.types.js";
|
||||
import type { TaskRecord, TaskRegistrySummary, TaskStatus } from "./task-registry.types.js";
|
||||
|
||||
const TASK_RECONCILE_GRACE_MS = 5 * 60_000;
|
||||
const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
|
||||
@@ -38,6 +43,8 @@ const SWEEP_YIELD_BATCH_SIZE = 25;
|
||||
let sweeper: NodeJS.Timeout | null = null;
|
||||
let deferredSweep: NodeJS.Timeout | null = null;
|
||||
let sweepInProgress = false;
|
||||
let configuredCronStorePath: string | undefined;
|
||||
let configuredCronRuntimeAuthoritative = false;
|
||||
|
||||
type TaskRegistryMaintenanceRuntime = {
|
||||
readAcpSessionEntry: typeof readAcpSessionEntry;
|
||||
@@ -51,9 +58,15 @@ type TaskRegistryMaintenanceRuntime = {
|
||||
getTaskById: typeof getTaskById;
|
||||
listTaskRecords: typeof listTaskRecords;
|
||||
markTaskLostById: typeof markTaskLostById;
|
||||
markTaskTerminalById: typeof markTaskTerminalById;
|
||||
maybeDeliverTaskTerminalUpdate: typeof maybeDeliverTaskTerminalUpdate;
|
||||
resolveTaskForLookupToken: typeof resolveTaskForLookupToken;
|
||||
setTaskCleanupAfterById: typeof setTaskCleanupAfterById;
|
||||
isCronRuntimeAuthoritative: () => boolean;
|
||||
resolveCronStorePath: typeof resolveCronStorePath;
|
||||
loadCronStoreSync: typeof loadCronStoreSync;
|
||||
resolveCronRunLogPath: typeof resolveCronRunLogPath;
|
||||
readCronRunLogEntriesSync: typeof readCronRunLogEntriesSync;
|
||||
};
|
||||
|
||||
const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
|
||||
@@ -68,9 +81,15 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
|
||||
getTaskById,
|
||||
listTaskRecords,
|
||||
markTaskLostById,
|
||||
markTaskTerminalById,
|
||||
maybeDeliverTaskTerminalUpdate,
|
||||
resolveTaskForLookupToken,
|
||||
setTaskCleanupAfterById,
|
||||
isCronRuntimeAuthoritative: () => configuredCronRuntimeAuthoritative,
|
||||
resolveCronStorePath: () => configuredCronStorePath ?? resolveCronStorePath(),
|
||||
loadCronStoreSync,
|
||||
resolveCronRunLogPath,
|
||||
readCronRunLogEntriesSync,
|
||||
};
|
||||
|
||||
let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime =
|
||||
@@ -83,6 +102,32 @@ export type TaskRegistryMaintenanceSummary = {
|
||||
pruned: number;
|
||||
};
|
||||
|
||||
type CronExecutionId = {
|
||||
jobId: string;
|
||||
startedAt: number;
|
||||
};
|
||||
|
||||
type CronTerminalRecovery = {
|
||||
status: Extract<TaskStatus, "succeeded" | "failed" | "timed_out">;
|
||||
endedAt: number;
|
||||
lastEventAt: number;
|
||||
error?: string;
|
||||
terminalSummary?: string;
|
||||
};
|
||||
|
||||
type CronRecoveryContext = {
|
||||
storePath: string;
|
||||
store?: CronStoreFile | null;
|
||||
runLogsByJobId: Map<string, CronRunLogEntry[]>;
|
||||
};
|
||||
|
||||
function createCronRecoveryContext(): CronRecoveryContext {
|
||||
return {
|
||||
storePath: taskRegistryMaintenanceRuntime.resolveCronStorePath(),
|
||||
runLogsByJobId: new Map<string, CronRunLogEntry[]>(),
|
||||
};
|
||||
}
|
||||
|
||||
function findSessionEntryByKey(store: Record<string, unknown>, sessionKey: string): unknown {
|
||||
const direct = store[sessionKey];
|
||||
if (direct) {
|
||||
@@ -110,6 +155,142 @@ function hasLostGraceExpired(task: TaskRecord, now: number): boolean {
|
||||
return now - referenceAt >= TASK_RECONCILE_GRACE_MS;
|
||||
}
|
||||
|
||||
function parseCronExecutionId(task: TaskRecord): CronExecutionId | undefined {
|
||||
const runId = task.runId?.trim();
|
||||
if (!runId?.startsWith("cron:")) {
|
||||
return undefined;
|
||||
}
|
||||
const separator = runId.lastIndexOf(":");
|
||||
if (separator <= "cron:".length) {
|
||||
return undefined;
|
||||
}
|
||||
const startedAt = Number(runId.slice(separator + 1));
|
||||
if (!Number.isFinite(startedAt)) {
|
||||
return undefined;
|
||||
}
|
||||
const jobId = runId.slice("cron:".length, separator).trim();
|
||||
if (!jobId || (task.sourceId?.trim() && task.sourceId.trim() !== jobId)) {
|
||||
return undefined;
|
||||
}
|
||||
return { jobId, startedAt };
|
||||
}
|
||||
|
||||
function isTimeoutCronError(error: string | undefined): boolean {
|
||||
return error === "cron: job execution timed out";
|
||||
}
|
||||
|
||||
function mapCronTerminalStatus(status: unknown, error?: string): CronTerminalRecovery["status"] {
|
||||
if (status === "ok" || status === "skipped") {
|
||||
return "succeeded";
|
||||
}
|
||||
return isTimeoutCronError(error) ? "timed_out" : "failed";
|
||||
}
|
||||
|
||||
function getCronRunLogEntries(context: CronRecoveryContext, jobId: string): CronRunLogEntry[] {
|
||||
const cached = context.runLogsByJobId.get(jobId);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
let entries: CronRunLogEntry[] = [];
|
||||
try {
|
||||
const logPath = taskRegistryMaintenanceRuntime.resolveCronRunLogPath({
|
||||
storePath: context.storePath,
|
||||
jobId,
|
||||
});
|
||||
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(logPath, {
|
||||
jobId,
|
||||
limit: 5000,
|
||||
});
|
||||
} catch {
|
||||
entries = [];
|
||||
}
|
||||
context.runLogsByJobId.set(jobId, entries);
|
||||
return entries;
|
||||
}
|
||||
|
||||
function getCronStore(context: CronRecoveryContext): CronStoreFile | null {
|
||||
if (context.store !== undefined) {
|
||||
return context.store;
|
||||
}
|
||||
try {
|
||||
context.store = taskRegistryMaintenanceRuntime.loadCronStoreSync(context.storePath);
|
||||
} catch {
|
||||
context.store = null;
|
||||
}
|
||||
return context.store;
|
||||
}
|
||||
|
||||
function resolveCronRunLogRecovery(
|
||||
execution: CronExecutionId,
|
||||
context: CronRecoveryContext,
|
||||
): CronTerminalRecovery | undefined {
|
||||
const entries = getCronRunLogEntries(context, execution.jobId);
|
||||
const entry = entries.findLast(
|
||||
(candidate) =>
|
||||
candidate.jobId === execution.jobId &&
|
||||
candidate.action === "finished" &&
|
||||
candidate.runAtMs === execution.startedAt &&
|
||||
(candidate.status === "ok" || candidate.status === "skipped" || candidate.status === "error"),
|
||||
);
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
const durationMs =
|
||||
typeof entry.durationMs === "number" && Number.isFinite(entry.durationMs)
|
||||
? Math.max(0, entry.durationMs)
|
||||
: undefined;
|
||||
const endedAt = durationMs === undefined ? entry.ts : execution.startedAt + durationMs;
|
||||
return {
|
||||
status: mapCronTerminalStatus(entry.status, entry.error),
|
||||
endedAt,
|
||||
lastEventAt: endedAt,
|
||||
...(entry.error !== undefined ? { error: entry.error } : {}),
|
||||
...(entry.summary !== undefined ? { terminalSummary: entry.summary } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveCronJobStateRecovery(
|
||||
execution: CronExecutionId,
|
||||
context: CronRecoveryContext,
|
||||
): CronTerminalRecovery | undefined {
|
||||
const store = getCronStore(context);
|
||||
const job: CronJob | undefined = store?.jobs.find((entry) => entry.id === execution.jobId);
|
||||
if (!job || job.state.lastRunAtMs !== execution.startedAt) {
|
||||
return undefined;
|
||||
}
|
||||
const status = job.state.lastRunStatus ?? job.state.lastStatus;
|
||||
if (status !== "ok" && status !== "skipped" && status !== "error") {
|
||||
return undefined;
|
||||
}
|
||||
const durationMs =
|
||||
typeof job.state.lastDurationMs === "number" && Number.isFinite(job.state.lastDurationMs)
|
||||
? Math.max(0, job.state.lastDurationMs)
|
||||
: 0;
|
||||
const endedAt = execution.startedAt + durationMs;
|
||||
return {
|
||||
status: mapCronTerminalStatus(status, job.state.lastError),
|
||||
endedAt,
|
||||
lastEventAt: endedAt,
|
||||
...(job.state.lastError !== undefined ? { error: job.state.lastError } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveDurableCronTaskRecovery(
|
||||
task: TaskRecord,
|
||||
context: CronRecoveryContext,
|
||||
): CronTerminalRecovery | undefined {
|
||||
if (task.runtime !== "cron" || !isActiveTask(task)) {
|
||||
return undefined;
|
||||
}
|
||||
const execution = parseCronExecutionId(task);
|
||||
if (!execution) {
|
||||
return undefined;
|
||||
}
|
||||
return (
|
||||
resolveCronRunLogRecovery(execution, context) ?? resolveCronJobStateRecovery(execution, context)
|
||||
);
|
||||
}
|
||||
|
||||
function hasActiveCliRun(task: TaskRecord): boolean {
|
||||
const candidateRunIds = [task.sourceId, task.runId];
|
||||
for (const candidate of candidateRunIds) {
|
||||
@@ -123,6 +304,9 @@ function hasActiveCliRun(task: TaskRecord): boolean {
|
||||
|
||||
function hasBackingSession(task: TaskRecord): boolean {
|
||||
if (task.runtime === "cron") {
|
||||
if (!taskRegistryMaintenanceRuntime.isCronRuntimeAuthoritative()) {
|
||||
return true;
|
||||
}
|
||||
const jobId = task.sourceId?.trim();
|
||||
return jobId ? taskRegistryMaintenanceRuntime.isCronJobActive(jobId) : false;
|
||||
}
|
||||
@@ -204,6 +388,41 @@ function markTaskLost(task: TaskRecord, now: number): TaskRecord {
|
||||
return updated;
|
||||
}
|
||||
|
||||
function markTaskRecovered(task: TaskRecord, recovery: CronTerminalRecovery): TaskRecord {
|
||||
const updated =
|
||||
taskRegistryMaintenanceRuntime.markTaskTerminalById({
|
||||
taskId: task.taskId,
|
||||
status: recovery.status,
|
||||
endedAt: recovery.endedAt,
|
||||
lastEventAt: recovery.lastEventAt,
|
||||
...(recovery.error !== undefined ? { error: recovery.error } : {}),
|
||||
...(recovery.terminalSummary !== undefined
|
||||
? { terminalSummary: recovery.terminalSummary }
|
||||
: {}),
|
||||
}) ?? projectTaskRecovered(task, recovery);
|
||||
void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId);
|
||||
return updated;
|
||||
}
|
||||
|
||||
function projectTaskRecovered(task: TaskRecord, recovery: CronTerminalRecovery): TaskRecord {
|
||||
const projected: TaskRecord = {
|
||||
...task,
|
||||
status: recovery.status,
|
||||
endedAt: recovery.endedAt,
|
||||
lastEventAt: recovery.lastEventAt,
|
||||
...(recovery.error !== undefined ? { error: recovery.error } : {}),
|
||||
...(recovery.terminalSummary !== undefined
|
||||
? { terminalSummary: recovery.terminalSummary }
|
||||
: {}),
|
||||
};
|
||||
return {
|
||||
...projected,
|
||||
...(typeof projected.cleanupAfter === "number"
|
||||
? {}
|
||||
: { cleanupAfter: resolveCleanupAfter(projected) }),
|
||||
};
|
||||
}
|
||||
|
||||
function projectTaskLost(task: TaskRecord, now: number): TaskRecord {
|
||||
const projected: TaskRecord = {
|
||||
...task,
|
||||
@@ -220,7 +439,14 @@ function projectTaskLost(task: TaskRecord, now: number): TaskRecord {
|
||||
};
|
||||
}
|
||||
|
||||
export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): TaskRecord {
|
||||
export function reconcileTaskRecordForOperatorInspection(
|
||||
task: TaskRecord,
|
||||
context: CronRecoveryContext = createCronRecoveryContext(),
|
||||
): TaskRecord {
|
||||
const cronRecovery = resolveDurableCronTaskRecovery(task, context);
|
||||
if (cronRecovery) {
|
||||
return projectTaskRecovered(task, cronRecovery);
|
||||
}
|
||||
const now = Date.now();
|
||||
if (!shouldMarkLost(task, now)) {
|
||||
return task;
|
||||
@@ -230,9 +456,10 @@ export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): Task
|
||||
|
||||
export function reconcileInspectableTasks(): TaskRecord[] {
|
||||
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
|
||||
const cronRecoveryContext = createCronRecoveryContext();
|
||||
return taskRegistryMaintenanceRuntime
|
||||
.listTaskRecords()
|
||||
.map((task) => reconcileTaskRecordForOperatorInspection(task));
|
||||
.map((task) => reconcileTaskRecordForOperatorInspection(task, cronRecoveryContext));
|
||||
}
|
||||
|
||||
configureTaskAuditTaskProvider(reconcileInspectableTasks);
|
||||
@@ -253,15 +480,21 @@ export function reconcileTaskLookupToken(token: string): TaskRecord | undefined
|
||||
}
|
||||
|
||||
// Preview is synchronous and cannot call the async detached-task recovery hook,
|
||||
// so recovered tasks are counted under reconciled here. The real sweep
|
||||
// in runTaskRegistryMaintenance splits them into reconciled vs recovered.
|
||||
// so hook-recovered tasks are counted under reconciled here. Durable cron
|
||||
// recovery is synchronous and can be previewed exactly.
|
||||
export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary {
|
||||
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
|
||||
const now = Date.now();
|
||||
let reconciled = 0;
|
||||
let recovered = 0;
|
||||
let cleanupStamped = 0;
|
||||
let pruned = 0;
|
||||
const cronRecoveryContext = createCronRecoveryContext();
|
||||
for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) {
|
||||
if (resolveDurableCronTaskRecovery(task, cronRecoveryContext)) {
|
||||
recovered += 1;
|
||||
continue;
|
||||
}
|
||||
if (shouldMarkLost(task, now)) {
|
||||
reconciled += 1;
|
||||
continue;
|
||||
@@ -274,7 +507,7 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary
|
||||
cleanupStamped += 1;
|
||||
}
|
||||
}
|
||||
return { reconciled, recovered: 0, cleanupStamped, pruned };
|
||||
return { reconciled, recovered, cleanupStamped, pruned };
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -305,12 +538,25 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
|
||||
let cleanupStamped = 0;
|
||||
let pruned = 0;
|
||||
const tasks = taskRegistryMaintenanceRuntime.listTaskRecords();
|
||||
const cronRecoveryContext = createCronRecoveryContext();
|
||||
let processed = 0;
|
||||
for (const task of tasks) {
|
||||
const current = taskRegistryMaintenanceRuntime.getTaskById(task.taskId);
|
||||
if (!current) {
|
||||
continue;
|
||||
}
|
||||
const cronRecovery = resolveDurableCronTaskRecovery(current, cronRecoveryContext);
|
||||
if (cronRecovery) {
|
||||
const next = markTaskRecovered(current, cronRecovery);
|
||||
if (next.status !== current.status) {
|
||||
recovered += 1;
|
||||
}
|
||||
processed += 1;
|
||||
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
|
||||
await yieldToEventLoop();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (shouldMarkLost(current, now)) {
|
||||
const recovery = await tryRecoverTaskBeforeMarkLost({
|
||||
taskId: current.taskId,
|
||||
@@ -412,6 +658,18 @@ export function setTaskRegistryMaintenanceRuntimeForTests(
|
||||
|
||||
export function resetTaskRegistryMaintenanceRuntimeForTests(): void {
|
||||
taskRegistryMaintenanceRuntime = defaultTaskRegistryMaintenanceRuntime;
|
||||
configuredCronStorePath = undefined;
|
||||
configuredCronRuntimeAuthoritative = false;
|
||||
}
|
||||
|
||||
export function configureTaskRegistryMaintenance(options: {
|
||||
cronStorePath?: string;
|
||||
cronRuntimeAuthoritative?: boolean;
|
||||
}): void {
|
||||
configuredCronStorePath = options.cronStorePath?.trim() || undefined;
|
||||
if (options.cronRuntimeAuthoritative !== undefined) {
|
||||
configuredCronRuntimeAuthoritative = options.cronRuntimeAuthoritative;
|
||||
}
|
||||
}
|
||||
|
||||
export function getReconciledTaskById(taskId: string): TaskRecord | undefined {
|
||||
|
||||
@@ -129,6 +129,7 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
|
||||
params.currentTasks.set(patch.taskId, next);
|
||||
return next;
|
||||
},
|
||||
markTaskTerminalById: () => null,
|
||||
maybeDeliverTaskTerminalUpdate: async () => null,
|
||||
resolveTaskForLookupToken: () => undefined,
|
||||
setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => {
|
||||
@@ -143,6 +144,11 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
|
||||
params.currentTasks.set(patch.taskId, next);
|
||||
return next;
|
||||
},
|
||||
isCronRuntimeAuthoritative: () => true,
|
||||
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
|
||||
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
|
||||
resolveCronRunLogPath: ({ jobId }) => jobId,
|
||||
readCronRunLogEntriesSync: () => [],
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1625,9 +1631,15 @@ describe("task-registry", () => {
|
||||
throw new Error("maintenance boom");
|
||||
},
|
||||
markTaskLostById: () => null,
|
||||
markTaskTerminalById: () => null,
|
||||
maybeDeliverTaskTerminalUpdate: async () => null,
|
||||
resolveTaskForLookupToken: () => undefined,
|
||||
setTaskCleanupAfterById: () => null,
|
||||
isCronRuntimeAuthoritative: () => true,
|
||||
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
|
||||
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
|
||||
resolveCronRunLogPath: ({ jobId }) => jobId,
|
||||
readCronRunLogEntriesSync: () => [],
|
||||
});
|
||||
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user