perf(tasks): optimize session lookups and sqlite upserts

This commit is contained in:
Vincent Koc
2026-03-30 14:34:48 +09:00
parent 89dbaa87aa
commit 0a014ca63a
10 changed files with 261 additions and 13 deletions

View File

@@ -8,6 +8,9 @@ const callGatewayMock = vi.fn();
const loadCombinedSessionStoreForGatewayMock = vi.fn();
const buildStatusMessageMock = vi.hoisted(() => vi.fn(() => "OpenClaw\n🧠 Model: GPT-5.4"));
const resolveQueueSettingsMock = vi.hoisted(() => vi.fn(() => ({ mode: "interrupt" })));
const listTasksForSessionKeyMock = vi.hoisted(() =>
vi.fn((_: string) => [] as Array<Record<string, unknown>>),
);
const createMockConfig = () => ({
session: { mainKey: "main", scope: "per-sender" },
@@ -189,6 +192,9 @@ async function loadFreshOpenClawToolsForSessionStatusTest() {
vi.doMock("../auto-reply/status.js", () => ({
buildStatusMessage: buildStatusMessageMock,
}));
vi.doMock("../tasks/task-registry.js", () => ({
listTasksForSessionKey: (sessionKey: string) => listTasksForSessionKeyMock(sessionKey),
}));
({ createSessionStatusTool } = await import("./tools/session-status-tool.js"));
}
@@ -200,6 +206,8 @@ function resetSessionStore(store: Record<string, SessionEntry>) {
updateSessionStoreMock.mockClear();
callGatewayMock.mockClear();
loadCombinedSessionStoreForGatewayMock.mockClear();
listTasksForSessionKeyMock.mockClear();
listTasksForSessionKeyMock.mockReturnValue([]);
loadSessionStoreMock.mockReturnValue(store);
loadCombinedSessionStoreForGatewayMock.mockReturnValue({
storePath: "(multiple)",
@@ -375,6 +383,38 @@ describe("session_status tool", () => {
expect(details.sessionKey).toBe("agent:main:current");
});
it("includes background task context in session_status output", async () => {
resetSessionStore({
"agent:main:main": {
sessionId: "sess-main",
updatedAt: Date.now(),
},
});
listTasksForSessionKeyMock.mockReturnValue([
{
taskId: "task-1",
runtime: "acp",
requesterSessionKey: "agent:main:main",
task: "Summarize inbox backlog",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "done_only",
createdAt: Date.now() - 5_000,
progressSummary: "Indexing the latest threads",
},
]);
const tool = createSessionStatusTool({ agentSessionKey: "agent:main:main" });
const result = await tool.execute("tc-1", { sessionKey: "agent:main:main" });
const firstContent = result.content?.[0];
const text = (firstContent as { text: string } | undefined)?.text ?? "";
expect(text).toContain("📌 Tasks: 1 active");
expect(text).toContain("acp");
expect(text).toContain("Summarize inbox backlog");
expect(text).toContain("Indexing the latest threads");
});
it("resolves a literal current sessionId in session_status", async () => {
resetSessionStore({
main: {

View File

@@ -216,6 +216,8 @@ export function createCronTool(opts?: CronToolOptions, deps?: CronToolDeps): Any
displaySummary: "Schedule and manage cron jobs and wake events.",
description: `Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events.
Main-session cron jobs enqueue system events for heartbeat handling. Isolated cron jobs create background task runs that appear in \`openclaw tasks\`.
ACTIONS:
- status: Check cron scheduler status
- list: List jobs (use includeDisabled:true to include disabled)

View File

@@ -23,6 +23,7 @@ import {
resolveAgentIdFromSessionKey,
} from "../../routing/session-key.js";
import { applyModelOverrideToSessionEntry } from "../../sessions/model-overrides.js";
import { listTasksForSessionKey } from "../../tasks/task-registry.js";
import { resolveAgentConfig, resolveAgentDir } from "../agent-scope.js";
import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js";
import { resolveModelAuthLabel } from "../model-auth-label.js";
@@ -118,6 +119,33 @@ function resolveStoreScopedRequesterKey(params: {
return parsed.rest === params.mainKey ? params.mainKey : params.requesterKey;
}
function formatSessionTaskLine(sessionKey: string): string | undefined {
const tasks = listTasksForSessionKey(sessionKey);
if (tasks.length === 0) {
return undefined;
}
const latest = tasks[0];
const active = tasks.filter(
(task) => task.status === "queued" || task.status === "running",
).length;
const failed = tasks.filter(
(task) => task.status === "failed" || task.status === "timed_out" || task.status === "lost",
).length;
const headline =
active > 0
? `${active} active`
: failed > 0
? `${failed} recent failure${failed === 1 ? "" : "s"}`
: `latest ${latest.status.replaceAll("_", " ")}`;
const title = latest.label?.trim() || latest.task.trim();
const detail =
latest.status === "running" || latest.status === "queued"
? latest.progressSummary?.trim()
: latest.error?.trim() || latest.terminalSummary?.trim();
const parts = [headline, latest.runtime, title, detail].filter(Boolean);
return parts.length ? `📌 Tasks: ${parts.join(" · ")}` : undefined;
}
async function resolveModelOverride(params: {
cfg: OpenClawConfig;
raw: string;
@@ -191,7 +219,7 @@ export function createSessionStatusTool(opts?: {
label: "Session Status",
name: "session_status",
description:
"Show a /status-equivalent session status card (usage + time + cost when available). Use for model-use questions (📊 session_status). Optional: set per-session model override (model=default resets overrides).",
"Show a /status-equivalent session status card (usage + time + cost when available), including linked background task context when present. Use for model-use questions (📊 session_status). Optional: set per-session model override (model=default resets overrides).",
parameters: SessionStatusToolSchema,
execute: async (_toolCallId, args) => {
const params = args as Record<string, unknown>;
@@ -537,14 +565,16 @@ export function createSessionStatusTool(opts?: {
},
includeTranscriptUsage: true,
});
const taskLine = formatSessionTaskLine(resolved.key);
const fullStatusText = taskLine ? `${statusText}\n${taskLine}` : statusText;
return {
content: [{ type: "text", text: statusText }],
content: [{ type: "text", text: fullStatusText }],
details: {
ok: true,
sessionKey: resolved.key,
changedModel,
statusText,
statusText: fullStatusText,
},
};
},

View File

@@ -111,6 +111,8 @@ const { handleAcpCommand } = await import("./commands-acp.js");
const { buildCommandTestParams } = await import("./commands-spawn.test-harness.js");
const { __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js");
const { __testing: acpResetTargetTesting } = await import("./acp-reset-target.js");
const { createTaskRecord, resetTaskRegistryForTests } =
await import("../../tasks/task-registry.js");
function parseTelegramChatIdForTest(raw?: string | null): string | undefined {
const trimmed = raw?.trim().replace(/^telegram:/i, "");
@@ -675,6 +677,7 @@ describe("/acp command", () => {
beforeEach(() => {
setMinimalAcpCommandRegistryForTests();
acpManagerTesting.resetAcpSessionManagerForTests();
resetTaskRegistryForTests();
acpResetTargetTesting.setDepsForTest({
getSessionBindingService: () => createAcpCommandSessionBindingService() as never,
});
@@ -1454,12 +1457,23 @@ describe("/acp command", () => {
lastUpdatedAt: Date.now(),
},
});
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: defaultAcpSessionKey,
runId: "acp-run-1",
task: "Inspect ACP backlog",
status: "running",
progressSummary: "Fetching the latest runtime state",
});
const result = await runThreadAcpCommand("/acp status", baseCfg);
expect(result?.reply?.text).toContain("ACP status:");
expect(result?.reply?.text).toContain(`session: ${defaultAcpSessionKey}`);
expect(result?.reply?.text).toContain("agent session id: codex-sid-1");
expect(result?.reply?.text).toContain("acpx session id: acpx-sid-1");
expect(result?.reply?.text).toContain("taskStatus: running");
expect(result?.reply?.text).toContain("taskProgress: Fetching the latest runtime state");
expect(result?.reply?.text).toContain("capabilities:");
expect(hoisted.getStatusMock).toHaveBeenCalledTimes(1);
});

View File

@@ -142,6 +142,14 @@ export async function handleAcpStatusAction(
`taskId: ${linkedTask.taskId}`,
`taskStatus: ${linkedTask.status}`,
`delivery: ${linkedTask.deliveryStatus}`,
...(linkedTask.progressSummary
? [`taskProgress: ${linkedTask.progressSummary}`]
: []),
...(linkedTask.terminalSummary ? [`taskSummary: ${linkedTask.terminalSummary}`] : []),
...(linkedTask.error ? [`taskError: ${linkedTask.error}`] : []),
...(typeof linkedTask.lastEventAt === "number"
? [`taskUpdatedAt: ${new Date(linkedTask.lastEventAt).toISOString()}`]
: []),
]
: []),
`runtimeOptions: ${formatRuntimeOptionsText(status.runtimeOptions)}`,

View File

@@ -46,6 +46,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command
`Task: ${run.task}`,
`Run: ${run.runId}`,
linkedTask ? `TaskId: ${linkedTask.taskId}` : undefined,
linkedTask ? `TaskStatus: ${linkedTask.status}` : undefined,
`Session: ${run.childSessionKey}`,
`SessionId: ${sessionEntry?.sessionId ?? "n/a"}`,
`Transcript: ${sessionEntry?.sessionFile ?? "n/a"}`,
@@ -57,6 +58,9 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command
run.archiveAtMs ? `Archive: ${formatTimestampWithAge(run.archiveAtMs)}` : undefined,
run.cleanupHandled ? "Cleanup handled: yes" : undefined,
`Outcome: ${outcome}`,
linkedTask?.progressSummary ? `Progress: ${linkedTask.progressSummary}` : undefined,
linkedTask?.terminalSummary ? `Task summary: ${linkedTask.terminalSummary}` : undefined,
linkedTask?.error ? `Task error: ${linkedTask.error}` : undefined,
linkedTask ? `Delivery: ${linkedTask.deliveryStatus}` : undefined,
].filter(Boolean);

View File

@@ -130,6 +130,8 @@ const { parseConfigCommand } = await import("./config-commands.js");
const { parseDebugCommand } = await import("./debug-commands.js");
const { parseInlineDirectives } = await import("./directive-handling.js");
const { buildCommandContext, handleCommands } = await import("./commands.js");
const { createTaskRecord, resetTaskRegistryForTests } =
await import("../../tasks/task-registry.js");
let testWorkspaceDir = os.tmpdir();
@@ -150,6 +152,7 @@ afterAll(async () => {
beforeEach(() => {
vi.useRealTimers();
vi.clearAllTimers();
resetTaskRegistryForTests();
setDefaultChannelPluginRegistryForTests();
readConfigFileSnapshotMock.mockImplementation(async () => {
const configPath = process.env.OPENCLAW_CONFIG_PATH;
@@ -2743,6 +2746,16 @@ describe("handleCommands subagents", () => {
endedAt: now - 1_000,
outcome: { status: "ok" },
});
createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:subagent:abc",
runId: "run-1",
task: "do thing",
status: "succeeded",
terminalSummary: "Completed the requested task",
deliveryStatus: "delivered",
});
const cfg = {
commands: { text: true },
channels: { whatsapp: { allowFrom: ["*"] } },
@@ -2754,6 +2767,8 @@ describe("handleCommands subagents", () => {
expect(result.reply?.text).toContain("Subagent info");
expect(result.reply?.text).toContain("Run: run-1");
expect(result.reply?.text).toContain("Status: done");
expect(result.reply?.text).toContain("TaskStatus: succeeded");
expect(result.reply?.text).toContain("Task summary: Completed the requested task");
});
it("does not resolve moved child rows from a stale older parent", async () => {

View File

@@ -40,7 +40,7 @@ type TaskDeliveryStateRow = {
type TaskRegistryStatements = {
selectAll: StatementSync;
selectAllDeliveryStates: StatementSync;
replaceRow: StatementSync;
upsertRow: StatementSync;
replaceDeliveryState: StatementSync;
deleteRow: StatementSync;
deleteDeliveryState: StatementSync;
@@ -186,6 +186,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
FROM task_runs
ORDER BY created_at ASC, task_id ASC
`),
<<<<<<< HEAD
selectAllDeliveryStates: db.prepare(`
SELECT
task_id,
@@ -194,8 +195,8 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
FROM task_delivery_state
ORDER BY task_id ASC
`),
replaceRow: db.prepare(`
INSERT OR REPLACE INTO task_runs (
upsertRow: db.prepare(`
INSERT INTO task_runs (
task_id,
runtime,
source_id,
@@ -242,6 +243,28 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
@terminal_summary,
@terminal_outcome
)
ON CONFLICT(task_id) DO UPDATE SET
runtime = excluded.runtime,
source_id = excluded.source_id,
requester_session_key = excluded.requester_session_key,
child_session_key = excluded.child_session_key,
parent_task_id = excluded.parent_task_id,
agent_id = excluded.agent_id,
run_id = excluded.run_id,
label = excluded.label,
task = excluded.task,
status = excluded.status,
delivery_status = excluded.delivery_status,
notify_policy = excluded.notify_policy,
created_at = excluded.created_at,
started_at = excluded.started_at,
ended_at = excluded.ended_at,
last_event_at = excluded.last_event_at,
cleanup_after = excluded.cleanup_after,
error = excluded.error,
progress_summary = excluded.progress_summary,
terminal_summary = excluded.terminal_summary,
terminal_outcome = excluded.terminal_outcome
`),
replaceDeliveryState: db.prepare(`
INSERT OR REPLACE INTO task_delivery_state (
@@ -371,7 +394,7 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho
statements.clearDeliveryStates.run();
statements.clearRows.run();
for (const task of snapshot.tasks.values()) {
statements.replaceRow.run(bindTaskRecord(task));
statements.upsertRow.run(bindTaskRecord(task));
}
for (const state of snapshot.deliveryStates.values()) {
statements.replaceDeliveryState.run(bindTaskDeliveryState(state));
@@ -381,7 +404,7 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho
export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) {
const store = openTaskRegistryDatabase();
store.statements.replaceRow.run(bindTaskRecord(task));
store.statements.upsertRow.run(bindTaskRecord(task));
ensureTaskRegistryPermissions(store.path);
}

View File

@@ -9,9 +9,11 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createTaskRecord,
findLatestTaskForSessionKey,
findTaskByRunId,
getTaskById,
getTaskRegistrySummary,
listTasksForSessionKey,
listTaskRecords,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
@@ -789,6 +791,35 @@ describe("task-registry", () => {
});
});
it("indexes tasks by session key for latest and list lookups", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
const older = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:subagent:child-1",
runId: "run-session-lookup-1",
task: "Older task",
});
const latest = createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:subagent:child-2",
runId: "run-session-lookup-2",
task: "Latest task",
});
expect(findLatestTaskForSessionKey("agent:main:main")?.taskId).toBe(latest.taskId);
expect(listTasksForSessionKey("agent:main:main").map((task) => task.taskId)).toEqual([
latest.taskId,
older.taskId,
]);
expect(findLatestTaskForSessionKey("agent:main:subagent:child-1")?.taskId).toBe(older.taskId);
});
});
it("projects inspection-time orphaned tasks as lost without mutating the registry", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@@ -45,6 +45,7 @@ const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
const tasks = new Map<string, TaskRecord>();
const taskDeliveryStates = new Map<string, TaskDeliveryState>();
const taskIdsByRunId = new Map<string, Set<string>>();
const taskIdsBySessionKey = new Map<string, Set<string>>();
const tasksWithPendingDelivery = new Set<string>();
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
@@ -218,6 +219,54 @@ function addRunIdIndex(taskId: string, runId?: string) {
ids.add(taskId);
}
function normalizeSessionIndexKey(sessionKey?: string): string | undefined {
const trimmed = sessionKey?.trim();
return trimmed ? trimmed : undefined;
}
function getTaskSessionIndexKeys(
task: Pick<TaskRecord, "requesterSessionKey" | "childSessionKey">,
) {
return [
...new Set(
[
normalizeSessionIndexKey(task.requesterSessionKey),
normalizeSessionIndexKey(task.childSessionKey),
].filter(Boolean) as string[],
),
];
}
function addSessionKeyIndex(
taskId: string,
task: Pick<TaskRecord, "requesterSessionKey" | "childSessionKey">,
) {
for (const sessionKey of getTaskSessionIndexKeys(task)) {
let ids = taskIdsBySessionKey.get(sessionKey);
if (!ids) {
ids = new Set<string>();
taskIdsBySessionKey.set(sessionKey, ids);
}
ids.add(taskId);
}
}
function deleteSessionKeyIndex(
taskId: string,
task: Pick<TaskRecord, "requesterSessionKey" | "childSessionKey">,
) {
for (const sessionKey of getTaskSessionIndexKeys(task)) {
const ids = taskIdsBySessionKey.get(sessionKey);
if (!ids) {
continue;
}
ids.delete(taskId);
if (ids.size === 0) {
taskIdsBySessionKey.delete(sessionKey);
}
}
}
function rebuildRunIdIndex() {
taskIdsByRunId.clear();
for (const [taskId, task] of tasks.entries()) {
@@ -225,6 +274,13 @@ function rebuildRunIdIndex() {
}
}
function rebuildSessionKeyIndex() {
taskIdsBySessionKey.clear();
for (const [taskId, task] of tasks.entries()) {
addSessionKeyIndex(taskId, task);
}
}
function getTasksByRunId(runId: string): TaskRecord[] {
const ids = taskIdsByRunId.get(runId.trim());
if (!ids || ids.size === 0) {
@@ -379,6 +435,7 @@ function restoreTaskRegistryOnce() {
taskDeliveryStates.set(taskId, state);
}
rebuildRunIdIndex();
rebuildSessionKeyIndex();
emitTaskRegistryHookEvent(() => ({
kind: "restored",
tasks: snapshotTaskRecords(tasks),
@@ -403,10 +460,19 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
const terminalAt = next.endedAt ?? next.lastEventAt ?? Date.now();
next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS;
}
const sessionIndexChanged =
normalizeSessionIndexKey(current.requesterSessionKey) !==
normalizeSessionIndexKey(next.requesterSessionKey) ||
normalizeSessionIndexKey(current.childSessionKey) !==
normalizeSessionIndexKey(next.childSessionKey);
tasks.set(taskId, next);
if (patch.runId && patch.runId !== current.runId) {
rebuildRunIdIndex();
}
if (sessionIndexChanged) {
deleteSessionKeyIndex(taskId, current);
addSessionKeyIndex(taskId, next);
}
persistTaskUpsert(next);
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
@@ -900,6 +966,7 @@ export function createTaskRecord(params: {
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
});
addRunIdIndex(taskId, record.runId);
addSessionKeyIndex(taskId, record);
persistTaskUpsert(record);
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
@@ -1190,13 +1257,25 @@ export function findTaskByRunId(runId: string): TaskRecord | undefined {
}
export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined {
const key = sessionKey.trim();
const task = listTasksForSessionKey(sessionKey)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForSessionKey(sessionKey: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(sessionKey);
if (!key) {
return undefined;
return [];
}
return listTaskRecords().find(
(task) => task.childSessionKey === key || task.requesterSessionKey === key,
);
const ids = taskIdsBySessionKey.get(key);
if (!ids || ids.size === 0) {
return [];
}
return [...ids]
.map((taskId) => tasks.get(taskId))
.filter((task): task is TaskRecord => Boolean(task))
.toSorted((left, right) => right.createdAt - left.createdAt)
.map((task) => cloneTaskRecord(task));
}
export function resolveTaskForLookupToken(token: string): TaskRecord | undefined {
@@ -1213,6 +1292,7 @@ export function deleteTaskRecordById(taskId: string): boolean {
if (!current) {
return false;
}
deleteSessionKeyIndex(taskId, current);
tasks.delete(taskId);
taskDeliveryStates.delete(taskId);
rebuildRunIdIndex();
@@ -1230,6 +1310,7 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
tasks.clear();
taskDeliveryStates.clear();
taskIdsByRunId.clear();
taskIdsBySessionKey.clear();
tasksWithPendingDelivery.clear();
restoreAttempted = false;
resetTaskRegistryRuntimeForTests();