fix: reconcile stale cron and chat-backed tasks (#60310) (thanks @lml2468)

This commit is contained in:
Peter Steinberger
2026-04-04 16:56:16 +09:00
parent 7036e5afbf
commit 7d1575b5df
6 changed files with 114 additions and 39 deletions

View File

@@ -60,6 +60,7 @@ Docs: https://docs.openclaw.ai
- Update/npm: prefer the npm binary that owns the installed global OpenClaw prefix so mixed Homebrew-plus-nvm setups update the right install. (#60153) Thanks @jayeshp19.
- Gateway/plugin routes: keep gateway-auth plugin runtime routes on write-only fallback scopes unless a trusted-proxy caller explicitly declares narrower `x-openclaw-scopes`, so plugin HTTP handlers no longer mint admin-level runtime scopes on missing or untrusted HTTP scope headers. (#59815) Thanks @pgondhi987.
- Agents/exec approvals: let `exec-approvals.json` agent security override stricter gateway tool defaults so approved subagents can use `security: "full"` without falling back to allowlist enforcement again. (#60310) Thanks @lml2468.
- Tasks/maintenance: reconcile stale cron and chat-backed CLI task rows against live cron-job and agent-run ownership instead of treating any persisted session key as proof that the task is still running. (#60310) Thanks @lml2468.
## 2026.4.2

38
src/cron/active-jobs.ts Normal file
View File

@@ -0,0 +1,38 @@
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
type CronActiveJobState = {
activeJobIds: Set<string>;
};
const CRON_ACTIVE_JOB_STATE_KEY = Symbol.for("openclaw.cron.activeJobs");
function getCronActiveJobState(): CronActiveJobState {
return resolveGlobalSingleton<CronActiveJobState>(CRON_ACTIVE_JOB_STATE_KEY, () => ({
activeJobIds: new Set<string>(),
}));
}
export function markCronJobActive(jobId: string) {
if (!jobId) {
return;
}
getCronActiveJobState().activeJobIds.add(jobId);
}
export function clearCronJobActive(jobId: string) {
if (!jobId) {
return;
}
getCronActiveJobState().activeJobIds.delete(jobId);
}
export function isCronJobActive(jobId: string) {
if (!jobId) {
return false;
}
return getCronActiveJobState().activeJobIds.has(jobId);
}
export function resetCronActiveJobsForTests() {
getCronActiveJobState().activeJobIds.clear();
}

View File

@@ -7,6 +7,7 @@ import {
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/task-executor.js";
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
@@ -561,6 +562,7 @@ export function applyJobResult(
}
function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void {
clearCronJobActive(result.jobId);
tryFinishCronTaskRun(state, result);
const store = state.store;
if (!store) {
@@ -716,6 +718,7 @@ export async function onTimer(state: CronServiceState) {
const { id, job } = params;
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
markCronJobActive(job.id);
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
const taskRunId = tryCreateCronTaskRun({ state, job, startedAt });
@@ -1299,6 +1302,7 @@ export async function executeJob(
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
job.state.lastError = undefined;
markCronJobActive(job.id);
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
let coreResult: {
@@ -1327,6 +1331,7 @@ export async function executeJob(
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
emit(state, { jobId: job.id, action: "removed" });
}
clearCronJobActive(job.id);
}
function emitJobFinished(

View File

@@ -26,11 +26,15 @@ async function loadMaintenanceModule(params: {
tasks: TaskRecord[];
sessionStore?: Record<string, unknown>;
acpEntry?: unknown;
activeCronJobIds?: string[];
activeRunIds?: string[];
}) {
vi.resetModules();
const sessionStore = params.sessionStore ?? {};
const acpEntry = params.acpEntry;
const activeCronJobIds = new Set(params.activeCronJobIds ?? []);
const activeRunIds = new Set(params.activeRunIds ?? []);
const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }]));
vi.doMock("../acp/runtime/session-meta.js", () => ({
@@ -45,6 +49,15 @@ async function loadMaintenanceModule(params: {
resolveStorePath: () => "",
}));
vi.doMock("../cron/active-jobs.js", () => ({
isCronJobActive: (jobId: string) => activeCronJobIds.has(jobId),
}));
vi.doMock("../infra/agent-events.js", () => ({
getAgentRunContext: (runId: string) =>
activeRunIds.has(runId) ? { sessionKey: "main" } : undefined,
}));
vi.doMock("./runtime-internal.js", () => ({
deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId),
ensureTaskRegistryReady: () => {},
@@ -90,22 +103,11 @@ async function loadMaintenanceModule(params: {
}
describe("task-registry maintenance issue #60299", () => {
it("marks cron tasks with no child session key lost after the grace period", async () => {
const task = makeStaleTask({
runtime: "cron",
childSessionKey: undefined,
});
const { mod, currentTasks } = await loadMaintenanceModule({ tasks: [task] });
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
});
it("marks cron tasks lost even if their transient child key still exists in the session store", async () => {
it("marks stale cron tasks lost once the runtime no longer tracks the job as active", async () => {
const childSessionKey = "agent:main:slack:channel:test-channel";
const task = makeStaleTask({
runtime: "cron",
sourceId: "cron-job-1",
childSessionKey,
});
@@ -118,10 +120,28 @@ describe("task-registry maintenance issue #60299", () => {
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
});
it("treats cli tasks backed only by a persistent chat session as stale", async () => {
it("keeps active cron tasks live while the cron runtime still owns the job", async () => {
const task = makeStaleTask({
runtime: "cron",
sourceId: "cron-job-2",
childSessionKey: undefined,
});
const { mod, currentTasks } = await loadMaintenanceModule({
tasks: [task],
activeCronJobIds: ["cron-job-2"],
});
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
it("marks chat-backed cli tasks lost after the owning run context disappears", async () => {
const channelKey = "agent:main:slack:channel:C1234567890";
const task = makeStaleTask({
runtime: "cli",
sourceId: "run-chat-cli-stale",
runId: "run-chat-cli-stale",
ownerKey: "agent:main:main",
requesterSessionKey: channelKey,
childSessionKey: channelKey,
@@ -136,18 +156,21 @@ describe("task-registry maintenance issue #60299", () => {
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
});
it("keeps subagent tasks live while their child session still exists", async () => {
const childKey = "agent:main:subagent:abc123";
it("keeps chat-backed cli tasks live while the owning run context is still active", async () => {
const channelKey = "agent:main:slack:channel:C1234567890";
const task = makeStaleTask({
runtime: "subagent",
runtime: "cli",
sourceId: "run-chat-cli-live",
runId: "run-chat-cli-live",
ownerKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
childSessionKey: childKey,
requesterSessionKey: channelKey,
childSessionKey: channelKey,
});
const { mod, currentTasks } = await loadMaintenanceModule({
tasks: [task],
sessionStore: { [childKey]: { updatedAt: Date.now() } },
sessionStore: { [channelKey]: { updatedAt: Date.now() } },
activeRunIds: ["run-chat-cli-live"],
});
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });

View File

@@ -1,5 +1,7 @@
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import { isCronJobActive } from "../cron/active-jobs.js";
import { getAgentRunContext } from "../infra/agent-events.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
import {
@@ -64,16 +66,25 @@ function hasLostGraceExpired(task: TaskRecord, now: number): boolean {
return now - referenceAt >= TASK_RECONCILE_GRACE_MS;
}
/**
* Returns false if the task's runtime is cron, since cron tasks do not maintain
* a persistent child session after the job exits.
*
* For cli tasks, long-lived channel/group/direct session-store entries do not
* imply task liveness, so only agent-scoped non-chat child sessions count.
*/
function hasActiveCliRun(task: TaskRecord): boolean {
const candidateRunIds = [task.sourceId, task.runId];
for (const candidate of candidateRunIds) {
const runId = candidate?.trim();
if (runId && getAgentRunContext(runId)) {
return true;
}
}
return false;
}
function hasBackingSession(task: TaskRecord): boolean {
if (task.runtime === "cron") {
return false;
const jobId = task.sourceId?.trim();
return jobId ? isCronJobActive(jobId) : false;
}
if (task.runtime === "cli" && hasActiveCliRun(task)) {
return true;
}
const childSessionKey = task.childSessionKey?.trim();
@@ -89,17 +100,12 @@ function hasBackingSession(task: TaskRecord): boolean {
}
return Boolean(acpEntry.entry);
}
if (task.runtime === "subagent") {
const agentId = parseAgentSessionKey(childSessionKey)?.agentId;
const storePath = resolveStorePath(undefined, { agentId });
const store = loadSessionStore(storePath);
return Boolean(findSessionEntryByKey(store, childSessionKey));
}
if (task.runtime === "cli") {
const chatType = deriveSessionChatType(childSessionKey);
if (chatType === "channel" || chatType === "group" || chatType === "direct") {
return false;
if (task.runtime === "subagent" || task.runtime === "cli") {
if (task.runtime === "cli") {
const chatType = deriveSessionChatType(childSessionKey);
if (chatType === "channel" || chatType === "group" || chatType === "direct") {
return false;
}
}
const agentId = parseAgentSessionKey(childSessionKey)?.agentId;
const storePath = resolveStorePath(undefined, { agentId });

View File

@@ -1,5 +1,6 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js";
import { resetCronActiveJobsForTests } from "../cron/active-jobs.js";
import {
emitAgentEvent,
registerAgentRunContext,
@@ -226,6 +227,7 @@ describe("task-registry", () => {
resetSystemEventsForTest();
resetHeartbeatWakeStateForTests();
resetAgentRunContextForTest();
resetCronActiveJobsForTests();
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });