feat(status): surface task run pressure (#57350)

* feat(status): surface task run pressure

* Update src/commands/tasks.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
Vincent Koc
2026-03-29 17:09:10 -07:00
committed by GitHub
parent 93dd25e6b2
commit e6445c22aa
18 changed files with 275 additions and 7 deletions

View File

@@ -385,6 +385,18 @@ export async function statusCommand(
: "";
const eventsValue =
summary.queuedSystemEvents.length > 0 ? `${summary.queuedSystemEvents.length} queued` : "none";
const tasksValue =
summary.tasks.total > 0
? [
`${summary.tasks.active} active`,
`${summary.tasks.byStatus.queued} queued`,
`${summary.tasks.byStatus.running} running`,
summary.tasks.failures > 0
? warn(`${summary.tasks.failures} issue${summary.tasks.failures === 1 ? "" : "s"}`)
: muted("no issues"),
`${summary.tasks.total} tracked`,
].join(" · ")
: muted("none");
const probesValue = health ? ok("enabled") : muted("skipped (use --deep)");
@@ -502,6 +514,7 @@ export async function statusCommand(
{ Item: "Plugin compatibility", Value: pluginCompatibilityValue },
{ Item: "Probes", Value: probesValue },
{ Item: "Events", Value: eventsValue },
{ Item: "Tasks", Value: tasksValue },
{ Item: "Heartbeat", Value: heartbeatValue },
...(lastHeartbeatValue ? [{ Item: "Last heartbeat", Value: lastHeartbeatValue }] : []),
{

View File

@@ -3,6 +3,7 @@ import type { UpdateCheckResult } from "../infra/update-check.js";
import { loggingState } from "../logging/state.js";
import { runExec } from "../process/exec.js";
import type { RuntimeEnv } from "../runtime.js";
import { createEmptyTaskRegistrySummary } from "../tasks/task-registry.summary.js";
import type { getAgentLocalStatuses as getAgentLocalStatusesFn } from "./status.agent-local.js";
import type { StatusScanResult } from "./status.scan.js";
import {
@@ -72,6 +73,7 @@ function buildColdStartStatusSummary(): Awaited<ReturnType<typeof getStatusSumma
},
channelSummary: [],
queuedSystemEvents: [],
tasks: createEmptyTaskRegistrySummary(),
sessions: {
paths: [],
count: 0,

View File

@@ -214,6 +214,27 @@ export function createStatusSummary(
) {
return {
linkChannel: options.linkChannel,
tasks: {
total: 0,
active: 0,
terminal: 0,
failures: 0,
byStatus: {
queued: 0,
running: 0,
succeeded: 0,
failed: 0,
timed_out: 0,
cancelled: 0,
lost: 0,
},
byRuntime: {
subagent: 0,
acp: 0,
cli: 0,
cron: 0,
},
},
sessions: {
count: 0,
paths: [],

View File

@@ -17,6 +17,7 @@ import {
import { runExec } from "../process/exec.js";
import type { RuntimeEnv } from "../runtime.js";
import { createLazyRuntimeSurface } from "../shared/lazy-runtime.js";
import { createEmptyTaskRegistrySummary } from "../tasks/task-registry.summary.js";
import type { buildChannelsTable as buildChannelsTableFn } from "./status-all/channels.js";
import type { getAgentLocalStatuses as getAgentLocalStatusesFn } from "./status.agent-local.js";
import { buildColdStartUpdateResult, scanStatusJsonCore } from "./status.scan.json-core.js";
@@ -172,6 +173,7 @@ function buildColdStartStatusSummary(): Awaited<ReturnType<typeof getStatusSumma
},
channelSummary: [],
queuedSystemEvents: [],
tasks: createEmptyTaskRegistrySummary(),
sessions: {
paths: [],
count: 0,

View File

@@ -29,6 +29,27 @@ describe("redactSensitiveStatusSummary", () => {
},
channelSummary: ["ok"],
queuedSystemEvents: ["none"],
tasks: {
total: 2,
active: 1,
terminal: 1,
failures: 1,
byStatus: {
queued: 1,
running: 0,
succeeded: 0,
failed: 1,
timed_out: 0,
cancelled: 0,
lost: 0,
},
byRuntime: {
subagent: 0,
acp: 1,
cli: 0,
cron: 1,
},
},
sessions: {
paths: ["/tmp/openclaw/sessions.json"],
count: 1,
@@ -54,5 +75,6 @@ describe("redactSensitiveStatusSummary", () => {
expect(redacted.runtimeVersion).toBe("2026.3.8");
expect(redacted.heartbeat).toEqual(input.heartbeat);
expect(redacted.channelSummary).toEqual(input.channelSummary);
expect(redacted.tasks).toEqual(input.tasks);
});
});

View File

@@ -25,7 +25,7 @@ vi.mock("../agents/defaults.js", () => ({
DEFAULT_PROVIDER: "openai",
}));
vi.mock("../config/config.js", () => ({
vi.mock("../config/io.js", () => ({
loadConfig: vi.fn(() => ({})),
}));
@@ -59,6 +59,30 @@ vi.mock("../infra/system-events.js", () => ({
peekSystemEvents: vi.fn(() => []),
}));
vi.mock("../tasks/task-registry.maintenance.js", () => ({
getInspectableTaskRegistrySummary: vi.fn(() => ({
total: 0,
active: 0,
terminal: 0,
failures: 0,
byStatus: {
queued: 0,
running: 0,
succeeded: 0,
failed: 0,
timed_out: 0,
cancelled: 0,
lost: 0,
},
byRuntime: {
subagent: 0,
acp: 0,
cli: 0,
cron: 0,
},
})),
}));
vi.mock("../routing/session-key.js", () => ({
normalizeAgentId: vi.fn((value: string) => value),
normalizeMainKey: vi.fn((value?: string) => value ?? "main"),
@@ -76,8 +100,13 @@ vi.mock("./status.link-channel.js", () => ({
const { hasPotentialConfiguredChannels } = await import("../channels/config-presence.js");
const { buildChannelSummary } = await import("../infra/channel-summary.js");
const { resolveLinkChannelContext } = await import("./status.link-channel.js");
const { statusSummaryRuntime } = await import("./status.summary.runtime.js");
const { getStatusSummary } = await import("./status.summary.js");
async function loadStatusSummaryForTest() {
vi.resetModules();
const { getStatusSummary } = await import("./status.summary.js");
const { statusSummaryRuntime } = await import("./status.summary.runtime.js");
return { getStatusSummary, statusSummaryRuntime };
}
describe("getStatusSummary", () => {
beforeEach(() => {
@@ -85,14 +114,17 @@ describe("getStatusSummary", () => {
});
it("includes runtimeVersion in the status payload", async () => {
const { getStatusSummary } = await loadStatusSummaryForTest();
const summary = await getStatusSummary();
expect(summary.runtimeVersion).toBe("2026.3.8");
expect(summary.heartbeat.defaultAgentId).toBe("main");
expect(summary.channelSummary).toEqual(["ok"]);
expect(summary.tasks.active).toBe(0);
});
it("skips channel summary imports when no channels are configured", async () => {
const { getStatusSummary } = await loadStatusSummaryForTest();
vi.mocked(hasPotentialConfiguredChannels).mockReturnValue(false);
const summary = await getStatusSummary();
@@ -104,6 +136,7 @@ describe("getStatusSummary", () => {
});
it("does not trigger async context warmup while building status summaries", async () => {
const { getStatusSummary, statusSummaryRuntime } = await loadStatusSummaryForTest();
await getStatusSummary();
expect(vi.mocked(statusSummaryRuntime.resolveContextTokensForModel)).toHaveBeenCalledWith(

View File

@@ -16,6 +16,9 @@ import type { HeartbeatStatus, SessionStatus, StatusSummary } from "./status.typ
let channelSummaryModulePromise: Promise<typeof import("../infra/channel-summary.js")> | undefined;
let linkChannelModulePromise: Promise<typeof import("./status.link-channel.js")> | undefined;
let configIoModulePromise: Promise<typeof import("../config/io.js")> | undefined;
let taskRegistryMaintenanceModulePromise:
| Promise<typeof import("../tasks/task-registry.maintenance.js")>
| undefined;
function loadChannelSummaryModule() {
channelSummaryModulePromise ??= import("../infra/channel-summary.js");
@@ -37,6 +40,11 @@ function loadConfigIoModule() {
return configIoModulePromise;
}
function loadTaskRegistryMaintenanceModule() {
taskRegistryMaintenanceModulePromise ??= import("../tasks/task-registry.maintenance.js");
return taskRegistryMaintenanceModulePromise;
}
const buildFlags = (entry?: SessionEntry): string[] => {
if (!entry) {
return [];
@@ -136,6 +144,7 @@ export async function getStatusSummary(
: [];
const mainSessionKey = resolveMainSessionKey(cfg);
const queuedSystemEvents = peekSystemEvents(mainSessionKey);
const tasks = (await loadTaskRegistryMaintenanceModule()).getInspectableTaskRegistrySummary();
const resolved = resolveConfiguredStatusModelRef({
cfg,
@@ -263,6 +272,7 @@ export async function getStatusSummary(
},
channelSummary,
queuedSystemEvents,
tasks,
sessions: {
paths: Array.from(paths),
count: totalSessions,

View File

@@ -556,6 +556,13 @@ describe("statusCommand", () => {
count: 0,
warnings: [],
});
expect(payload.tasks).toEqual(
expect.objectContaining({
total: 0,
active: 0,
byStatus: expect.objectContaining({ queued: 0, running: 0 }),
}),
);
expect(mocks.runSecurityAudit).toHaveBeenCalledWith(
expect.objectContaining({
includeFilesystem: true,
@@ -601,6 +608,7 @@ describe("statusCommand", () => {
"Channels",
"WhatsApp",
"bootstrap files",
"Tasks",
"Sessions",
"+1000",
"50%",

View File

@@ -1,4 +1,5 @@
import type { ChannelId } from "../channels/plugins/types.js";
import type { TaskRegistrySummary } from "../tasks/task-registry.types.js";
export type SessionStatus = {
agentId?: string;
@@ -48,6 +49,7 @@ export type StatusSummary = {
};
channelSummary: string[];
queuedSystemEvents: string[];
tasks: TaskRegistrySummary;
sessions: {
paths: string[];
count: number;

View File

@@ -80,6 +80,7 @@ describe("tasks commands", () => {
await tasksListCommand({ runtime: "acp", status: "running" }, runtime);
expect(runtimeLogs[0]).toContain("Background tasks: 1");
expect(runtimeLogs[1]).toContain("Task pressure: 0 queued · 1 running · 0 issues");
expect(runtimeLogs.join("\n")).toContain("No output for 60s. It may be waiting for input.");
});

View File

@@ -83,6 +83,11 @@ function formatTaskRows(tasks: TaskRecord[], rich: boolean) {
return lines;
}
function formatTaskListSummary(tasks: TaskRecord[]) {
const summary = summarizeTaskRecords(tasks);
return `${summary.byStatus.queued} queued · ${summary.byStatus.running} running · ${summary.failures} issues`;
}
export async function tasksListCommand(
opts: { json?: boolean; runtime?: string; status?: string },
runtime: RuntimeEnv,
@@ -116,6 +121,7 @@ export async function tasksListCommand(
}
runtime.log(info(`Background tasks: ${tasks.length}`));
runtime.log(info(`Task pressure: ${formatTaskListSummary(tasks)}`));
if (runtimeFilter) {
runtime.log(info(`Runtime filter: ${runtimeFilter}`));
}

View File

@@ -16,6 +16,7 @@ import {
} from "../infra/restart.js";
import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
import { getInspectableTaskRegistrySummary } from "../tasks/task-registry.maintenance.js";
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
import type { ChannelKind } from "./config-reload-plan.js";
import type { GatewayReloadPlan } from "./config-reload.js";
@@ -167,11 +168,13 @@ export function createGatewayReloadHandlers(params: {
const queueSize = getTotalQueueSize();
const pendingReplies = getTotalPendingReplies();
const embeddedRuns = getActiveEmbeddedRunCount();
const activeTasks = getInspectableTaskRegistrySummary().active;
return {
queueSize,
pendingReplies,
embeddedRuns,
totalActive: queueSize + pendingReplies + embeddedRuns,
activeTasks,
totalActive: queueSize + pendingReplies + embeddedRuns + activeTasks,
};
};
const formatActiveDetails = (counts: ReturnType<typeof getActiveCounts>) => {
@@ -185,6 +188,9 @@ export function createGatewayReloadHandlers(params: {
if (counts.embeddedRuns > 0) {
details.push(`${counts.embeddedRuns} embedded run(s)`);
}
if (counts.activeTasks > 0) {
details.push(`${counts.activeTasks} task run(s)`);
}
return details;
};
const active = getActiveCounts();

View File

@@ -75,7 +75,10 @@ import {
} from "../secrets/runtime.js";
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js";
import {
getInspectableTaskRegistrySummary,
startTaskRegistryMaintenance,
} from "../tasks/task-registry.maintenance.js";
import { runSetupWizard } from "../wizard/setup.js";
import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
@@ -524,7 +527,11 @@ export async function startGatewayServer(
}
setGatewaySigusr1RestartPolicy({ allowExternal: isRestartEnabled(cfgAtStart) });
setPreRestartDeferralCheck(
() => getTotalQueueSize() + getTotalPendingReplies() + getActiveEmbeddedRunCount(),
() =>
getTotalQueueSize() +
getTotalPendingReplies() +
getActiveEmbeddedRunCount() +
getInspectableTaskRegistrySummary().active,
);
// Unconditional startup migration: seed gateway.controlUi.allowedOrigins for existing
// non-loopback installs that upgraded to v2026.2.26+ without required origins.

View File

@@ -10,7 +10,8 @@ import {
resolveTaskForLookupToken,
updateTaskRecordById,
} from "./task-registry.js";
import type { TaskRecord } from "./task-registry.types.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type { TaskRecord, TaskRegistrySummary } from "./task-registry.types.js";
const TASK_RECONCILE_GRACE_MS = 5 * 60_000;
const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
@@ -124,6 +125,10 @@ export function reconcileInspectableTasks(): TaskRecord[] {
return listTaskRecords().map((task) => reconcileTaskRecordForOperatorInspection(task));
}
export function getInspectableTaskRegistrySummary(): TaskRegistrySummary {
return summarizeTaskRecords(reconcileInspectableTasks());
}
export function reconcileTaskLookupToken(token: string): TaskRecord | undefined {
ensureTaskRegistryReady();
const task = resolveTaskForLookupToken(token);

View File

@@ -0,0 +1,56 @@
import type {
TaskRecord,
TaskRegistrySummary,
TaskRuntimeCounts,
TaskStatusCounts,
} from "./task-registry.types.js";
function createEmptyTaskStatusCounts(): TaskStatusCounts {
return {
queued: 0,
running: 0,
succeeded: 0,
failed: 0,
timed_out: 0,
cancelled: 0,
lost: 0,
};
}
function createEmptyTaskRuntimeCounts(): TaskRuntimeCounts {
return {
subagent: 0,
acp: 0,
cli: 0,
cron: 0,
};
}
export function createEmptyTaskRegistrySummary(): TaskRegistrySummary {
return {
total: 0,
active: 0,
terminal: 0,
failures: 0,
byStatus: createEmptyTaskStatusCounts(),
byRuntime: createEmptyTaskRuntimeCounts(),
};
}
export function summarizeTaskRecords(records: Iterable<TaskRecord>): TaskRegistrySummary {
const summary = createEmptyTaskRegistrySummary();
for (const task of records) {
summary.total += 1;
summary.byStatus[task.status] += 1;
summary.byRuntime[task.runtime] += 1;
if (task.status === "queued" || task.status === "running") {
summary.active += 1;
} else {
summary.terminal += 1;
}
if (task.status === "failed" || task.status === "timed_out" || task.status === "lost") {
summary.failures += 1;
}
}
return summary;
}

View File

@@ -11,6 +11,7 @@ import {
createTaskRecord,
findTaskByRunId,
getTaskById,
getTaskRegistrySummary,
listTaskRecords,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
@@ -141,6 +142,60 @@ describe("task-registry", () => {
});
});
it("summarizes task pressure by status and runtime", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
runId: "run-summary-acp",
task: "Investigate issue",
status: "queued",
deliveryStatus: "pending",
});
createTaskRecord({
runtime: "cron",
requesterSessionKey: "",
runId: "run-summary-cron",
task: "Daily digest",
status: "running",
deliveryStatus: "not_applicable",
});
createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
runId: "run-summary-subagent",
task: "Write patch",
status: "timed_out",
deliveryStatus: "session_queued",
});
expect(getTaskRegistrySummary()).toEqual({
total: 3,
active: 2,
terminal: 1,
failures: 1,
byStatus: {
queued: 1,
running: 1,
succeeded: 0,
failed: 0,
timed_out: 1,
cancelled: 0,
lost: 0,
},
byRuntime: {
subagent: 1,
acp: 1,
cli: 0,
cron: 1,
},
});
});
});
it("delivers ACP completion to the requester channel when a delivery origin exists", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@@ -15,12 +15,14 @@ import {
resetTaskRegistryRuntimeForTests,
type TaskRegistryHookEvent,
} from "./task-registry.store.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type {
TaskDeliveryStatus,
TaskEventKind,
TaskEventRecord,
TaskNotifyPolicy,
TaskRecord,
TaskRegistrySummary,
TaskRegistrySnapshot,
TaskRuntime,
TaskStatus,
@@ -1049,6 +1051,11 @@ export function listTaskRecords(): TaskRecord[] {
.toSorted((a, b) => b.createdAt - a.createdAt);
}
export function getTaskRegistrySummary(): TaskRegistrySummary {
ensureTaskRegistryReady();
return summarizeTaskRecords(tasks.values());
}
export function getTaskRegistrySnapshot(): TaskRegistrySnapshot {
return {
tasks: listTaskRecords(),

View File

@@ -23,6 +23,18 @@ export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent";
export type TaskTerminalOutcome = "succeeded" | "blocked";
export type TaskStatusCounts = Record<TaskStatus, number>;
export type TaskRuntimeCounts = Record<TaskRuntime, number>;
export type TaskRegistrySummary = {
total: number;
active: number;
terminal: number;
failures: number;
byStatus: TaskStatusCounts;
byRuntime: TaskRuntimeCounts;
};
export type TaskEventKind = TaskStatus | "progress";
export type TaskEventRecord = {