From c7106c4285b6f626336ef438088e622dbcc3beb4 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 30 Mar 2026 12:49:10 +0900 Subject: [PATCH] refactor(tasks): replace generic task mutation api --- src/cron/service/ops.test.ts | 2 +- src/cron/service/ops.ts | 5 +- src/cron/service/timer.ts | 5 +- src/tasks/task-registry.maintenance.ts | 10 +-- src/tasks/task-registry.test.ts | 15 ++-- src/tasks/task-registry.ts | 94 ++++++++++++++++++++++++-- 6 files changed, 112 insertions(+), 19 deletions(-) diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index 24562e0c55f..b2a1f143cd2 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -210,7 +210,7 @@ describe("cron service ops seam coverage", () => { }); const updateTaskRecordSpy = vi - .spyOn(taskRegistry, "updateTaskRecordById") + .spyOn(taskRegistry, "markTaskTerminalById") .mockImplementation(() => { throw new Error("disk full"); }); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 47420bf9558..97dea355a2d 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -1,6 +1,6 @@ import { enqueueCommandInLane } from "../../process/command-queue.js"; import { CommandLane } from "../../process/lanes.js"; -import { createTaskRecord, updateTaskRecordById } from "../../tasks/task-registry.js"; +import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js"; import { @@ -424,7 +424,8 @@ function tryUpdateManualTaskRecord( return; } try { - updateTaskRecordById(params.taskId, { + markTaskTerminalById({ + taskId: params.taskId, status: params.coreResult.status === "ok" || params.coreResult.status === "skipped" ? "succeeded" diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 4b78dcd48d2..d145fa8f842 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -2,7 +2,7 @@ import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; -import { createTaskRecord, updateTaskRecordById } from "../../tasks/task-registry.js"; +import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { @@ -158,7 +158,8 @@ function tryUpdateCronTaskRecord( return; } try { - updateTaskRecordById(result.taskId, { + markTaskTerminalById({ + taskId: result.taskId, status: result.status === "ok" || result.status === "skipped" ? "succeeded" diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index a013dfd4317..f639e122172 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -8,9 +8,10 @@ import { ensureTaskRegistryReady, getTaskById, listTaskRecords, + markTaskLostById, maybeDeliverTaskTerminalUpdate, resolveTaskForLookupToken, - updateTaskRecordById, + setTaskCleanupAfterById, } from "./task-registry.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskRecord, TaskRegistrySummary } from "./task-registry.types.js"; @@ -110,8 +111,8 @@ function resolveCleanupAfter(task: TaskRecord): number { function markTaskLost(task: TaskRecord, now: number): TaskRecord { const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter; const updated = - updateTaskRecordById(task.taskId, { - status: "lost", + markTaskLostById({ + taskId: task.taskId, endedAt: task.endedAt ?? now, lastEventAt: now, error: task.error ?? "backing session missing", @@ -207,7 +208,8 @@ export function runTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary { } if ( shouldStampCleanupAfter(task) && - updateTaskRecordById(task.taskId, { + setTaskCleanupAfterById({ + taskId: task.taskId, cleanupAfter: resolveCleanupAfter(task), }) ) { diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 77ac45e58d0..0a540745ea4 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -17,8 +17,9 @@ import { maybeDeliverTaskTerminalUpdate, resetTaskRegistryForTests, resolveTaskForLookupToken, + setTaskProgressById, + setTaskTimingById, updateTaskNotifyPolicyById, - updateTaskRecordById, updateTaskStateByRunId, } from "./task-registry.js"; import { @@ -440,7 +441,8 @@ describe("task-registry", () => { startedAt: 100, }); - updateTaskRecordById(findTaskByRunId("run-detail-leak")!.taskId, { + setTaskProgressById({ + taskId: findTaskByRunId("run-detail-leak")!.taskId, progressSummary: "I am loading the local session context and checking helper command availability before writing the file.", }); @@ -800,7 +802,8 @@ describe("task-registry", () => { status: "running", deliveryStatus: "pending", }); - updateTaskRecordById(task.taskId, { + setTaskTimingById({ + taskId: task.taskId, lastEventAt: Date.now() - 10 * 60_000, }); @@ -832,7 +835,8 @@ describe("task-registry", () => { status: "running", deliveryStatus: "pending", }); - updateTaskRecordById(task.taskId, { + setTaskTimingById({ + taskId: task.taskId, lastEventAt: now - 10 * 60_000, }); @@ -864,7 +868,8 @@ describe("task-registry", () => { deliveryStatus: "not_applicable", startedAt: Date.now() - 9 * 24 * 60 * 60_000, }); - updateTaskRecordById(task.taskId, { + setTaskTimingById({ + taskId: task.taskId, endedAt: Date.now() - 8 * 24 * 60 * 60_000, lastEventAt: Date.now() - 8 * 24 * 60 * 60_000, }); diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 0c621e1dc9d..a2aa43c97b2 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -686,12 +686,96 @@ export async function maybeDeliverTaskStateChangeUpdate( } } -export function updateTaskRecordById( - taskId: string, - patch: Partial, -): TaskRecord | null { +export function setTaskProgressById(params: { + taskId: string; + progressSummary?: string | null; + lastEventAt?: number; +}): TaskRecord | null { ensureTaskRegistryReady(); - return updateTask(taskId, patch); + const patch: Partial = {}; + if (params.progressSummary !== undefined) { + patch.progressSummary = normalizeTaskSummary(params.progressSummary); + } + if (params.lastEventAt != null) { + patch.lastEventAt = params.lastEventAt; + } + return updateTask(params.taskId, patch); +} + +export function setTaskTimingById(params: { + taskId: string; + startedAt?: number; + endedAt?: number; + lastEventAt?: number; +}): TaskRecord | null { + ensureTaskRegistryReady(); + const patch: Partial = {}; + if (params.startedAt != null) { + patch.startedAt = params.startedAt; + } + if (params.endedAt != null) { + patch.endedAt = params.endedAt; + } + if (params.lastEventAt != null) { + patch.lastEventAt = params.lastEventAt; + } + return updateTask(params.taskId, patch); +} + +export function setTaskCleanupAfterById(params: { + taskId: string; + cleanupAfter: number; +}): TaskRecord | null { + ensureTaskRegistryReady(); + return updateTask(params.taskId, { + cleanupAfter: params.cleanupAfter, + }); +} + +export function markTaskTerminalById(params: { + taskId: string; + status: Extract; + endedAt: number; + lastEventAt?: number; + error?: string; + terminalSummary?: string | null; + terminalOutcome?: TaskTerminalOutcome | null; +}): TaskRecord | null { + ensureTaskRegistryReady(); + return updateTask(params.taskId, { + status: params.status, + endedAt: params.endedAt, + lastEventAt: params.lastEventAt ?? params.endedAt, + ...(params.error !== undefined ? { error: params.error } : {}), + ...(params.terminalSummary !== undefined + ? { terminalSummary: normalizeTaskSummary(params.terminalSummary) } + : {}), + ...(params.terminalOutcome !== undefined + ? { + terminalOutcome: resolveTaskTerminalOutcome({ + status: params.status, + terminalOutcome: params.terminalOutcome, + }), + } + : {}), + }); +} + +export function markTaskLostById(params: { + taskId: string; + endedAt: number; + lastEventAt?: number; + error?: string; + cleanupAfter?: number; +}): TaskRecord | null { + ensureTaskRegistryReady(); + return updateTask(params.taskId, { + status: "lost", + endedAt: params.endedAt, + lastEventAt: params.lastEventAt ?? params.endedAt, + ...(params.error !== undefined ? { error: params.error } : {}), + ...(params.cleanupAfter !== undefined ? { cleanupAfter: params.cleanupAfter } : {}), + }); } function updateTasksByRunId(runId: string, patch: Partial): TaskRecord[] {