From f0517db70b5b8cb7ff3ccac66623eca141c7a928 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Sun, 12 Apr 2026 15:43:08 -0700 Subject: [PATCH] fix: route deferred maintenance through task seams --- .../context-engine-maintenance.ts | 55 ++++++++++++------- src/tasks/task-owner-access.ts | 45 ++++++++++++++- 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index a0d01c447ca..5db1678659d 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -13,14 +13,14 @@ import { createQueuedTaskRun, failTaskRunByRunId, recordTaskRunProgressByRunId, + setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, } from "../../tasks/task-executor.js"; import { - findTaskByRunId, - markTaskTerminalById, - setTaskRunDeliveryStatusByRunId, - updateTaskNotifyPolicyById, -} from "../../tasks/task-registry.js"; + cancelTaskByIdForOwner, + findTaskByRunIdForOwner, + updateTaskNotifyPolicyForOwner, +} from "../../tasks/task-owner-access.js"; import { findActiveSessionTask } from "../session-async-task-status.js"; import { resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; @@ -182,14 +182,15 @@ export function resetDeferredTurnMaintenanceStateForTest(): void { } function markDeferredTurnMaintenanceTaskScheduleFailure(params: { + sessionKey: string; taskId: string; error: unknown; }): void { const errorMessage = formatErrorMessage(params.error); log.warn(`failed to schedule deferred context engine maintenance: ${errorMessage}`); - markTaskTerminalById({ + cancelTaskByIdForOwner({ taskId: params.taskId, - status: "cancelled", + callerOwnerKey: params.sessionKey, endedAt: Date.now(), terminalSummary: `Deferred maintenance could not be scheduled: ${errorMessage}`, }); @@ -221,7 +222,10 @@ function promoteTurnMaintenanceTaskVisibility(params: { runId: string; notifyPolicy: "done_only" | "state_changes"; }) { - const task = findTaskByRunId(params.runId); + const task = findTaskByRunIdForOwner({ + runId: params.runId, + callerOwnerKey: params.sessionKey, + }); if (!task) { return createQueuedTaskRun({ runtime: "acp", @@ -238,19 +242,25 @@ function promoteTurnMaintenanceTaskVisibility(params: { preferMetadata: true, }); } - setTaskRunDeliveryStatusByRunId({ + setDetachedTaskDeliveryStatusByRunId({ runId: params.runId, runtime: "acp", sessionKey: params.sessionKey, deliveryStatus: "pending", }); if (task.notifyPolicy !== params.notifyPolicy) { - updateTaskNotifyPolicyById({ + updateTaskNotifyPolicyForOwner({ taskId: task.taskId, + callerOwnerKey: params.sessionKey, notifyPolicy: params.notifyPolicy, }); } - return findTaskByRunId(params.runId) ?? task; + return ( + findTaskByRunIdForOwner({ + runId: params.runId, + callerOwnerKey: params.sessionKey, + }) ?? task + ); } /** @@ -285,8 +295,9 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { }); const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId); if (params.deferTranscriptRewriteToSessionLane && rewriteSessionKey) { - return await enqueueCommandInLane(resolveSessionLane(rewriteSessionKey), async () => - await rewriteTranscriptEntriesInFile(), + return await enqueueCommandInLane( + resolveSessionLane(rewriteSessionKey), + async () => await rewriteTranscriptEntriesInFile(), ); } return await rewriteTranscriptEntriesInFile(); @@ -443,11 +454,14 @@ async function runDeferredTurnMaintenanceWorker(params: { clearTimeout(longRunningTimer); longRunningTimer = null; } - const task = findTaskByRunId(params.runId); + const task = findTaskByRunIdForOwner({ + runId: params.runId, + callerOwnerKey: params.sessionKey, + }); if (task) { - markTaskTerminalById({ + cancelTaskByIdForOwner({ taskId: task.taskId, - status: "cancelled", + callerOwnerKey: params.sessionKey, endedAt: Date.now(), terminalSummary: "Deferred maintenance cancelled during shutdown.", }); @@ -502,13 +516,14 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule }); const reusableTask = existingTask?.runId?.trim() ? existingTask : undefined; if (existingTask && !reusableTask) { - updateTaskNotifyPolicyById({ + updateTaskNotifyPolicyForOwner({ taskId: existingTask.taskId, + callerOwnerKey: sessionKey, notifyPolicy: "silent", }); - markTaskTerminalById({ + cancelTaskByIdForOwner({ taskId: existingTask.taskId, - status: "cancelled", + callerOwnerKey: sessionKey, endedAt: Date.now(), terminalSummary: "Superseded by refreshed deferred maintenance task.", }); @@ -538,6 +553,7 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule ); } catch (err) { markDeferredTurnMaintenanceTaskScheduleFailure({ + sessionKey, taskId: task.taskId, error: err, }); @@ -547,6 +563,7 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule const trackedPromise = runPromise .catch((err) => { markDeferredTurnMaintenanceTaskScheduleFailure({ + sessionKey, taskId: task.taskId, error: err, }); diff --git a/src/tasks/task-owner-access.ts b/src/tasks/task-owner-access.ts index 5b26d7dbb62..041d2d2f365 100644 --- a/src/tasks/task-owner-access.ts +++ b/src/tasks/task-owner-access.ts @@ -3,9 +3,11 @@ import { findTaskByRunId, getTaskById, listTasksForRelatedSessionKey, + markTaskTerminalById as markTaskTerminalRecordById, resolveTaskForLookupToken, + updateTaskNotifyPolicyById, } from "./task-registry.js"; -import type { TaskRecord } from "./task-registry.types.js"; +import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; import { buildTaskStatusSnapshot } from "./task-status.js"; function canOwnerAccessTask(task: TaskRecord, callerOwnerKey: string): boolean { @@ -31,6 +33,47 @@ export function findTaskByRunIdForOwner(params: { return task && canOwnerAccessTask(task, params.callerOwnerKey) ? task : undefined; } +/** Update an owner-visible task's notification policy. */ +export function updateTaskNotifyPolicyForOwner(params: { + taskId: string; + callerOwnerKey: string; + notifyPolicy: TaskNotifyPolicy; +}): TaskRecord | null { + const task = getTaskByIdForOwner({ + taskId: params.taskId, + callerOwnerKey: params.callerOwnerKey, + }); + if (!task) { + return null; + } + return updateTaskNotifyPolicyById({ + taskId: task.taskId, + notifyPolicy: params.notifyPolicy, + }); +} + +/** Mark an owner-visible task as cancelled with a caller-provided summary. */ +export function cancelTaskByIdForOwner(params: { + taskId: string; + callerOwnerKey: string; + endedAt: number; + terminalSummary?: string | null; +}): TaskRecord | null { + const task = getTaskByIdForOwner({ + taskId: params.taskId, + callerOwnerKey: params.callerOwnerKey, + }); + if (!task) { + return null; + } + return markTaskTerminalRecordById({ + taskId: task.taskId, + status: "cancelled", + endedAt: params.endedAt, + terminalSummary: params.terminalSummary, + }); +} + export function listTasksForRelatedSessionKeyForOwner(params: { relatedSessionKey: string; callerOwnerKey: string;