mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-27 20:18:49 +00:00
feat(tasks): explain stale-running maintenance decisions (#84691)
Add JSON-only task maintenance diagnostics for stale running tasks and include the maintainer changelog entry.
This commit is contained in:
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- CLI/tasks: include stale-running task maintenance decisions in `openclaw tasks maintenance --json` so retained and reconcile candidates explain backing-session, cron, CLI, and wedged-subagent state. (#84691) Thanks @efpiva.
|
||||
- Codex app-server: keep system-prompt reports working when bootstrap hooks provide workspace files with only a path and content, so hook-supplied SOUL/IDENTITY/TOOLS/USER context still reports injected characters correctly. (#84736) Thanks @JARVIS-Glasses.
|
||||
- Providers/MiniMax music: stop advertising `durationSeconds` control and remove prompt-injected duration hints, so `music_generate` reports MiniMax duration as an unsupported override instead of suggesting MiniMax can enforce track length. Fixes #84508. Thanks @neeravmakwana.
|
||||
- WhatsApp: update Baileys to `7.0.0-rc12`.
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
resetTaskRegistryDeliveryRuntimeForTests,
|
||||
resetTaskRegistryForTests,
|
||||
} from "../tasks/task-registry.js";
|
||||
import * as taskRegistryMaintenance from "../tasks/task-registry.maintenance.js";
|
||||
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
|
||||
import type { OpenClawTestState } from "../test-utils/openclaw-test-state.js";
|
||||
import { tasksAuditCommand, tasksMaintenanceCommand } from "./tasks.js";
|
||||
@@ -141,6 +142,148 @@ describe("tasks commands", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("explains stale running tasks retained by backing sessions in maintenance JSON", async () => {
|
||||
await withTaskCommandStateDir(async (state) => {
|
||||
const now = Date.now();
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(now - 45 * 60_000);
|
||||
const childSessionKey = "agent:main:subagent:child-retained";
|
||||
const task = createTaskRecord({
|
||||
runtime: "subagent",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey,
|
||||
runId: "run-retained-child",
|
||||
status: "running",
|
||||
task: "Review retained child session",
|
||||
});
|
||||
vi.setSystemTime(now);
|
||||
|
||||
const sessionsDir = state.sessionsDir("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(sessionsDir, "sessions.json"),
|
||||
JSON.stringify(
|
||||
{
|
||||
[childSessionKey]: {
|
||||
sessionId: "child-retained",
|
||||
updatedAt: now,
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const runtime = createRuntime();
|
||||
await tasksMaintenanceCommand({ json: true, apply: false }, runtime);
|
||||
|
||||
const payload = readFirstJsonLog(runtime) as {
|
||||
diagnostics: {
|
||||
staleRunningTasks: Array<{
|
||||
taskId: string;
|
||||
decision: string;
|
||||
reason: string;
|
||||
childSessionKey?: string;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
|
||||
expect(payload.diagnostics.staleRunningTasks).toContainEqual(
|
||||
expect.objectContaining({
|
||||
taskId: task.taskId,
|
||||
decision: "retained",
|
||||
reason: "backing_session_present",
|
||||
childSessionKey,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("explains task maintenance decisions before applying session registry pruning", async () => {
|
||||
await withTaskCommandStateDir(async (state) => {
|
||||
const now = Date.now();
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(now - 45 * 60_000);
|
||||
const childSessionKey = "agent:main:cron:done-job:run:old-run";
|
||||
const task = createTaskRecord({
|
||||
runtime: "subagent",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey,
|
||||
runId: "run-backed-before-session-sweep",
|
||||
status: "running",
|
||||
task: "Review old cron child session",
|
||||
});
|
||||
vi.setSystemTime(now);
|
||||
|
||||
const sessionsDir = state.sessionsDir("main");
|
||||
const storePath = path.join(sessionsDir, "sessions.json");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[childSessionKey]: {
|
||||
sessionId: "old-run",
|
||||
updatedAt: now - 8 * 24 * 60 * 60_000,
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const runtime = createRuntime();
|
||||
await tasksMaintenanceCommand({ json: true, apply: true }, runtime);
|
||||
|
||||
const payload = readFirstJsonLog(runtime) as {
|
||||
maintenance: {
|
||||
tasks: { reconciled: number };
|
||||
sessions: { pruned: number };
|
||||
};
|
||||
diagnostics: {
|
||||
staleRunningTasks: Array<{
|
||||
taskId: string;
|
||||
decision: string;
|
||||
reason: string;
|
||||
childSessionKey?: string;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
|
||||
expect(payload.maintenance.tasks.reconciled).toBe(0);
|
||||
expect(payload.maintenance.sessions.pruned).toBe(1);
|
||||
expect(payload.diagnostics.staleRunningTasks).toContainEqual(
|
||||
expect.objectContaining({
|
||||
taskId: task.taskId,
|
||||
decision: "retained",
|
||||
reason: "backing_session_present",
|
||||
childSessionKey,
|
||||
}),
|
||||
);
|
||||
|
||||
const updated = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
|
||||
expect(updated[childSessionKey]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not build JSON-only diagnostics for text maintenance output", async () => {
|
||||
await withTaskCommandStateDir(async () => {
|
||||
const diagnosticsSpy = vi.spyOn(
|
||||
taskRegistryMaintenance,
|
||||
"getTaskRegistryMaintenanceDiagnostics",
|
||||
);
|
||||
const runtime = createRuntime();
|
||||
|
||||
await tasksMaintenanceCommand({ json: false, apply: false }, runtime);
|
||||
|
||||
expect(diagnosticsSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps tasks maintenance JSON additive for TaskFlow state", async () => {
|
||||
await withTaskCommandStateDir(async () => {
|
||||
const now = Date.now();
|
||||
|
||||
@@ -38,6 +38,7 @@ import {
|
||||
getInspectableTaskAuditSummary,
|
||||
getInspectableTaskRegistrySummary,
|
||||
configureTaskRegistryMaintenance,
|
||||
getTaskRegistryMaintenanceDiagnostics,
|
||||
previewTaskRegistryMaintenance,
|
||||
runTaskRegistryMaintenance,
|
||||
} from "../tasks/task-registry.maintenance.js";
|
||||
@@ -664,6 +665,9 @@ export async function tasksMaintenanceCommand(
|
||||
const taskMaintenance = opts.apply
|
||||
? await runTaskRegistryMaintenance()
|
||||
: previewTaskRegistryMaintenance();
|
||||
// JSON diagnostics explain the task-maintenance decision above, before the
|
||||
// separate session-registry sweep can prune backing session rows.
|
||||
const diagnostics = opts.json ? getTaskRegistryMaintenanceDiagnostics() : undefined;
|
||||
const flowMaintenance = opts.apply
|
||||
? await runTaskFlowRegistryMaintenance()
|
||||
: previewTaskFlowRegistryMaintenance();
|
||||
@@ -683,6 +687,7 @@ export async function tasksMaintenanceCommand(
|
||||
sessions: sessionMaintenance,
|
||||
},
|
||||
tasks: summary,
|
||||
diagnostics,
|
||||
auditBefore: {
|
||||
...auditBefore,
|
||||
taskFlows: flowAuditBefore,
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
} from "./detached-task-runtime.js";
|
||||
import {
|
||||
getInspectableActiveTaskRestartBlockers,
|
||||
getTaskRegistryMaintenanceDiagnostics,
|
||||
previewTaskRegistryMaintenance,
|
||||
reconcileInspectableTasks,
|
||||
resetTaskRegistryMaintenanceRuntimeForTests,
|
||||
@@ -315,10 +316,14 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
|
||||
it("marks subagent tasks lost when their child session recovery is tombstoned", async () => {
|
||||
const childSessionKey = "agent:main:subagent:wedged-child";
|
||||
const staleAt = Date.now() - 45 * 60_000;
|
||||
const task = makeStaleTask({
|
||||
runtime: "subagent",
|
||||
runId: "run-wedged-child",
|
||||
childSessionKey,
|
||||
createdAt: staleAt,
|
||||
startedAt: staleAt,
|
||||
lastEventAt: staleAt,
|
||||
});
|
||||
|
||||
const { currentTasks } = createTaskRegistryMaintenanceHarness({
|
||||
@@ -340,6 +345,14 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
});
|
||||
|
||||
expectMaintenanceCounts(previewTaskRegistryMaintenance(), { reconciled: 1 });
|
||||
expect(getTaskRegistryMaintenanceDiagnostics().staleRunningTasks).toContainEqual(
|
||||
expect.objectContaining({
|
||||
taskId: task.taskId,
|
||||
decision: "would_reconcile",
|
||||
reason: "subagent_recovery_wedged",
|
||||
detail: "subagent orphan recovery blocked after 2 rapid accepted resume attempts",
|
||||
}),
|
||||
);
|
||||
expectMaintenanceCounts(await runTaskRegistryMaintenance(), { reconciled: 1 });
|
||||
const storedTask = requireTaskRecord(currentTasks, task.taskId);
|
||||
expect(storedTask.status).toBe("lost");
|
||||
@@ -366,7 +379,7 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
});
|
||||
|
||||
it("recovers finished cron tasks from durable run logs before marking them lost", async () => {
|
||||
const startedAt = Date.now() - GRACE_EXPIRED_MS;
|
||||
const startedAt = Date.now() - 60 * 60_000;
|
||||
const task = makeStaleTask({
|
||||
runtime: "cron",
|
||||
sourceId: "cron-job-run-log-ok",
|
||||
@@ -399,6 +412,7 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
expect(reconciledTasks[0]?.endedAt).toBe(startedAt + 1250);
|
||||
expect(reconciledTasks[0]?.terminalSummary).toBe("done");
|
||||
expectMaintenanceCounts(previewTaskRegistryMaintenance(), { reconciled: 0, recovered: 1 });
|
||||
expect(getTaskRegistryMaintenanceDiagnostics().staleRunningTasks).toHaveLength(0);
|
||||
expectMaintenanceCounts(await runTaskRegistryMaintenance(), { reconciled: 0, recovered: 1 });
|
||||
const storedTask = requireTaskRecord(currentTasks, task.taskId);
|
||||
expect(storedTask.status).toBe("succeeded");
|
||||
|
||||
@@ -64,6 +64,7 @@ import type { TaskRecord, TaskRegistrySummary, TaskStatus } from "./task-registr
|
||||
const log = createSubsystemLogger("tasks/task-registry-maintenance");
|
||||
const TASK_RECONCILE_GRACE_MS = 5 * 60_000;
|
||||
const CHILDLESS_CODEX_NATIVE_RECONCILE_GRACE_MS = 30 * 60_000;
|
||||
const TASK_STALE_RUNNING_MS = 30 * 60_000;
|
||||
const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
|
||||
const TASK_SWEEP_INTERVAL_MS = 60_000;
|
||||
|
||||
@@ -162,6 +163,28 @@ export type TaskRegistryMaintenanceSummary = {
|
||||
pruned: number;
|
||||
};
|
||||
|
||||
export type TaskRegistryMaintenanceTaskDiagnostic = {
|
||||
taskId: string;
|
||||
runtime: TaskRecord["runtime"];
|
||||
status: TaskRecord["status"];
|
||||
decision: "retained" | "would_reconcile";
|
||||
reason:
|
||||
| "active_cli_run"
|
||||
| "backing_session_missing"
|
||||
| "backing_session_present"
|
||||
| "cron_runtime_not_authoritative"
|
||||
| "lost_grace_pending"
|
||||
| "subagent_recovery_wedged";
|
||||
detail?: string;
|
||||
ageMs: number;
|
||||
childSessionKey?: string;
|
||||
runId?: string;
|
||||
};
|
||||
|
||||
export type TaskRegistryMaintenanceDiagnostics = {
|
||||
staleRunningTasks: TaskRegistryMaintenanceTaskDiagnostic[];
|
||||
};
|
||||
|
||||
type CronExecutionId = {
|
||||
jobId: string;
|
||||
startedAt: number;
|
||||
@@ -562,6 +585,10 @@ function resolveCleanupAfter(task: TaskRecord): number {
|
||||
return terminalAt + TASK_RETENTION_MS;
|
||||
}
|
||||
|
||||
function taskReferenceAt(task: TaskRecord): number {
|
||||
return task.lastEventAt ?? task.startedAt ?? task.createdAt;
|
||||
}
|
||||
|
||||
function getNormalizedTaskChildSessionKey(task: TaskRecord): string | undefined {
|
||||
return normalizeOptionalString(task.childSessionKey);
|
||||
}
|
||||
@@ -952,6 +979,72 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary
|
||||
return { reconciled, recovered, cleanupStamped, pruned };
|
||||
}
|
||||
|
||||
function explainActiveTaskRetention(params: {
|
||||
task: TaskRecord;
|
||||
now: number;
|
||||
context: BackingSessionLookupContext;
|
||||
}): Pick<TaskRegistryMaintenanceTaskDiagnostic, "decision" | "reason" | "detail"> {
|
||||
if (!hasLostGraceExpired(params.task, params.now)) {
|
||||
return { decision: "retained", reason: "lost_grace_pending" };
|
||||
}
|
||||
if (params.task.runtime === "subagent") {
|
||||
const entry = findTaskSessionEntry(params.task, params.context);
|
||||
if (entry && isSubagentRecoveryWedgedEntry(entry)) {
|
||||
return {
|
||||
decision: "would_reconcile",
|
||||
reason: "subagent_recovery_wedged",
|
||||
detail: formatSubagentRecoveryWedgedReason(entry),
|
||||
};
|
||||
}
|
||||
}
|
||||
if (!hasBackingSession(params.task, params.context)) {
|
||||
return { decision: "would_reconcile", reason: "backing_session_missing" };
|
||||
}
|
||||
if (
|
||||
params.task.runtime === "cron" &&
|
||||
!taskRegistryMaintenanceRuntime.isCronRuntimeAuthoritative()
|
||||
) {
|
||||
return { decision: "retained", reason: "cron_runtime_not_authoritative" };
|
||||
}
|
||||
if (params.task.runtime === "cli" && hasActiveCliRun(params.task)) {
|
||||
return { decision: "retained", reason: "active_cli_run" };
|
||||
}
|
||||
return { decision: "retained", reason: "backing_session_present" };
|
||||
}
|
||||
|
||||
export function getTaskRegistryMaintenanceDiagnostics(): TaskRegistryMaintenanceDiagnostics {
|
||||
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
|
||||
const now = Date.now();
|
||||
const cronRecoveryContext = createCronRecoveryContext();
|
||||
const backingSessionContext = createBackingSessionLookupContext();
|
||||
const staleRunningTasks: TaskRegistryMaintenanceTaskDiagnostic[] = [];
|
||||
for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) {
|
||||
if (task.status !== "running") {
|
||||
continue;
|
||||
}
|
||||
const ageMs = Math.max(0, now - taskReferenceAt(task));
|
||||
if (ageMs < TASK_STALE_RUNNING_MS) {
|
||||
continue;
|
||||
}
|
||||
if (resolveDurableCronTaskRecovery(task, cronRecoveryContext)) {
|
||||
continue;
|
||||
}
|
||||
const decision = explainActiveTaskRetention({ task, now, context: backingSessionContext });
|
||||
staleRunningTasks.push({
|
||||
taskId: task.taskId,
|
||||
runtime: task.runtime,
|
||||
status: task.status,
|
||||
decision: decision.decision,
|
||||
reason: decision.reason,
|
||||
ageMs,
|
||||
...(decision.detail ? { detail: decision.detail } : {}),
|
||||
...(task.childSessionKey ? { childSessionKey: task.childSessionKey } : {}),
|
||||
...(task.runId ? { runId: task.runId } : {}),
|
||||
});
|
||||
}
|
||||
return { staleRunningTasks };
|
||||
}
|
||||
|
||||
/**
|
||||
* Yield control back to the event loop so that pending I/O callbacks,
|
||||
* timers, and incoming requests can be processed between batches of
|
||||
|
||||
Reference in New Issue
Block a user