diff --git a/src/tasks/task-executor-policy.test.ts b/src/tasks/task-executor-policy.test.ts new file mode 100644 index 00000000000..0c4d07838c4 --- /dev/null +++ b/src/tasks/task-executor-policy.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it } from "vitest"; +import { + formatTaskBlockedFollowupMessage, + formatTaskStateChangeMessage, + formatTaskTerminalMessage, + isTerminalTaskStatus, + shouldAutoDeliverTaskStateChange, + shouldAutoDeliverTaskTerminalUpdate, + shouldSuppressDuplicateTerminalDelivery, +} from "./task-executor-policy.js"; +import type { TaskEventRecord, TaskRecord } from "./task-registry.types.js"; + +function createTask(partial: Partial): TaskRecord { + return { + taskId: partial.taskId ?? "task-1", + runtime: partial.runtime ?? "acp", + requesterSessionKey: partial.requesterSessionKey ?? "agent:main:main", + task: partial.task ?? "Investigate issue", + status: partial.status ?? "running", + deliveryStatus: partial.deliveryStatus ?? "pending", + notifyPolicy: partial.notifyPolicy ?? "done_only", + createdAt: partial.createdAt ?? 1, + ...partial, + }; +} + +describe("task-executor-policy", () => { + it("identifies terminal statuses", () => { + expect(isTerminalTaskStatus("queued")).toBe(false); + expect(isTerminalTaskStatus("running")).toBe(false); + expect(isTerminalTaskStatus("succeeded")).toBe(true); + expect(isTerminalTaskStatus("failed")).toBe(true); + expect(isTerminalTaskStatus("timed_out")).toBe(true); + expect(isTerminalTaskStatus("cancelled")).toBe(true); + expect(isTerminalTaskStatus("lost")).toBe(true); + }); + + it("formats terminal, followup, and progress messages", () => { + const blockedTask = createTask({ + status: "succeeded", + terminalOutcome: "blocked", + terminalSummary: "Needs login.", + runId: "run-1234567890", + label: "ACP import", + }); + const progressEvent: TaskEventRecord = { + at: 10, + kind: "progress", + summary: "No output for 60s.", + }; + + expect(formatTaskTerminalMessage(blockedTask)).toBe( + "Background task blocked: ACP import (run run-1234). Needs login.", + ); + expect(formatTaskBlockedFollowupMessage(blockedTask)).toBe( + "Task needs follow-up: ACP import (run run-1234). Needs login.", + ); + expect(formatTaskStateChangeMessage(blockedTask, progressEvent)).toBe( + "Background task update: ACP import. No output for 60s.", + ); + }); + + it("keeps delivery policy decisions explicit", () => { + expect( + shouldAutoDeliverTaskTerminalUpdate( + createTask({ + status: "succeeded", + deliveryStatus: "pending", + notifyPolicy: "done_only", + }), + ), + ).toBe(true); + expect( + shouldAutoDeliverTaskTerminalUpdate( + createTask({ + runtime: "subagent", + status: "succeeded", + deliveryStatus: "pending", + }), + ), + ).toBe(false); + expect( + shouldAutoDeliverTaskStateChange( + createTask({ + status: "running", + notifyPolicy: "state_changes", + deliveryStatus: "pending", + }), + ), + ).toBe(true); + expect( + shouldAutoDeliverTaskStateChange( + createTask({ + status: "failed", + notifyPolicy: "state_changes", + deliveryStatus: "pending", + }), + ), + ).toBe(false); + expect( + shouldSuppressDuplicateTerminalDelivery({ + task: createTask({ + runtime: "acp", + runId: "run-duplicate", + }), + preferredTaskId: "task-2", + }), + ).toBe(true); + }); +}); diff --git a/src/tasks/task-executor-policy.ts b/src/tasks/task-executor-policy.ts new file mode 100644 index 00000000000..bc9e8f6508c --- /dev/null +++ b/src/tasks/task-executor-policy.ts @@ -0,0 +1,110 @@ +import type { TaskEventRecord, TaskRecord, TaskStatus } from "./task-registry.types.js"; + +export function isTerminalTaskStatus(status: TaskStatus): boolean { + return ( + status === "succeeded" || + status === "failed" || + status === "timed_out" || + status === "cancelled" || + status === "lost" + ); +} + +function resolveTaskDisplayTitle(task: TaskRecord): string { + return ( + task.label?.trim() || + (task.runtime === "acp" + ? "ACP background task" + : task.runtime === "subagent" + ? "Subagent task" + : task.task.trim() || "Background task") + ); +} + +function resolveTaskRunLabel(task: TaskRecord): string { + return task.runId ? ` (run ${task.runId.slice(0, 8)})` : ""; +} + +export function formatTaskTerminalMessage(task: TaskRecord): string { + const title = resolveTaskDisplayTitle(task); + const runLabel = resolveTaskRunLabel(task); + const summary = task.terminalSummary?.trim(); + if (task.status === "succeeded") { + if (task.terminalOutcome === "blocked") { + return summary + ? `Background task blocked: ${title}${runLabel}. ${summary}` + : `Background task blocked: ${title}${runLabel}.`; + } + return summary + ? `Background task done: ${title}${runLabel}. ${summary}` + : `Background task done: ${title}${runLabel}.`; + } + if (task.status === "timed_out") { + return `Background task timed out: ${title}${runLabel}.`; + } + if (task.status === "lost") { + return `Background task lost: ${title}${runLabel}. ${task.error ?? "Backing session disappeared."}`; + } + if (task.status === "cancelled") { + return `Background task cancelled: ${title}${runLabel}.`; + } + const error = task.error?.trim(); + return error + ? `Background task failed: ${title}${runLabel}. ${error}` + : `Background task failed: ${title}${runLabel}.`; +} + +export function formatTaskBlockedFollowupMessage(task: TaskRecord): string | null { + if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") { + return null; + } + const title = resolveTaskDisplayTitle(task); + const runLabel = resolveTaskRunLabel(task); + const summary = task.terminalSummary?.trim() || "Task is blocked and needs follow-up."; + return `Task needs follow-up: ${title}${runLabel}. ${summary}`; +} + +export function formatTaskStateChangeMessage( + task: TaskRecord, + event: TaskEventRecord, +): string | null { + const title = resolveTaskDisplayTitle(task); + if (event.kind === "running") { + return `Background task started: ${title}.`; + } + if (event.kind === "progress") { + return event.summary ? `Background task update: ${title}. ${event.summary}` : null; + } + return null; +} + +export function shouldAutoDeliverTaskTerminalUpdate(task: TaskRecord): boolean { + if (task.notifyPolicy === "silent") { + return false; + } + if (task.runtime === "subagent" && task.status !== "cancelled") { + return false; + } + if (!isTerminalTaskStatus(task.status)) { + return false; + } + return task.deliveryStatus === "pending"; +} + +export function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean { + return ( + task.notifyPolicy === "state_changes" && + task.deliveryStatus === "pending" && + !isTerminalTaskStatus(task.status) + ); +} + +export function shouldSuppressDuplicateTerminalDelivery(params: { + task: TaskRecord; + preferredTaskId?: string; +}): boolean { + if (params.task.runtime !== "acp" || !params.task.runId?.trim()) { + return false; + } + return Boolean(params.preferredTaskId && params.preferredTaskId !== params.task.taskId); +} diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 5ba68d8eddf..6158f21eeb0 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -9,6 +9,15 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { + formatTaskBlockedFollowupMessage, + formatTaskStateChangeMessage, + formatTaskTerminalMessage, + isTerminalTaskStatus, + shouldAutoDeliverTaskStateChange, + shouldAutoDeliverTaskTerminalUpdate, + shouldSuppressDuplicateTerminalDelivery, +} from "./task-executor-policy.js"; import { getTaskRegistryHooks, getTaskRegistryStore, @@ -384,16 +393,6 @@ export function ensureTaskRegistryReady() { ensureListener(); } -function isTerminalTaskStatus(status: TaskStatus): boolean { - return ( - status === "succeeded" || - status === "failed" || - status === "timed_out" || - status === "cancelled" || - status === "lost" - ); -} - function updateTask(taskId: string, patch: Partial): TaskRecord | null { const current = tasks.get(taskId); if (!current) { @@ -441,43 +440,6 @@ function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined { return state ? cloneTaskDeliveryState(state) : undefined; } -function formatTaskTerminalEvent(task: TaskRecord): string { - // User-facing task notifications stay intentionally terse. Detailed runtime chatter lives - // in task metadata for inspection, not in the default channel ping. - const title = - task.label?.trim() || - (task.runtime === "acp" - ? "ACP background task" - : task.runtime === "subagent" - ? "Subagent task" - : task.task.trim() || "Background task"); - const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : ""; - const summary = task.terminalSummary?.trim(); - if (task.status === "succeeded") { - if (task.terminalOutcome === "blocked") { - return summary - ? `Background task blocked: ${title}${runLabel}. ${summary}` - : `Background task blocked: ${title}${runLabel}.`; - } - return summary - ? `Background task done: ${title}${runLabel}. ${summary}` - : `Background task done: ${title}${runLabel}.`; - } - if (task.status === "timed_out") { - return `Background task timed out: ${title}${runLabel}.`; - } - if (task.status === "lost") { - return `Background task lost: ${title}${runLabel}. ${task.error ?? "Backing session disappeared."}`; - } - if (task.status === "cancelled") { - return `Background task cancelled: ${title}${runLabel}.`; - } - const error = task.error?.trim(); - return error - ? `Background task failed: ${title}${runLabel}. ${error}` - : `Background task failed: ${title}${runLabel}.`; -} - function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean { const origin = normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin); const channel = origin?.channel?.trim(); @@ -503,23 +465,15 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) { } function queueBlockedTaskFollowup(task: TaskRecord) { - if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") { + const followupText = formatTaskBlockedFollowupMessage(task); + if (!followupText) { return false; } const requesterSessionKey = task.requesterSessionKey.trim(); if (!requesterSessionKey) { return false; } - const title = - task.label?.trim() || - (task.runtime === "acp" - ? "ACP background task" - : task.runtime === "subagent" - ? "Subagent task" - : task.task.trim() || "Background task"); - const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : ""; - const summary = task.terminalSummary?.trim() || "Task is blocked and needs follow-up."; - enqueueSystemEvent(`Task needs follow-up: ${title}${runLabel}. ${summary}`, { + enqueueSystemEvent(followupText, { sessionKey: requesterSessionKey, contextKey: `task:${task.taskId}:blocked-followup`, deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin, @@ -531,66 +485,10 @@ function queueBlockedTaskFollowup(task: TaskRecord) { return true; } -function formatTaskStateChangeEvent(task: TaskRecord, event: TaskEventRecord): string | null { - const title = - task.label?.trim() || - (task.runtime === "acp" - ? "ACP background task" - : task.runtime === "subagent" - ? "Subagent task" - : task.task.trim() || "Background task"); - if (event.kind === "running") { - return `Background task started: ${title}.`; - } - if (event.kind === "progress") { - return event.summary ? `Background task update: ${title}. ${event.summary}` : null; - } - return null; -} - -function shouldAutoDeliverTaskUpdate(task: TaskRecord): boolean { - if (task.notifyPolicy === "silent") { - return false; - } - if (task.runtime === "subagent" && task.status !== "cancelled") { - return false; - } - if ( - task.status !== "succeeded" && - task.status !== "failed" && - task.status !== "timed_out" && - task.status !== "lost" && - task.status !== "cancelled" - ) { - return false; - } - return task.deliveryStatus === "pending"; -} - -function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean { - return ( - task.notifyPolicy === "state_changes" && - task.deliveryStatus === "pending" && - task.status !== "succeeded" && - task.status !== "failed" && - task.status !== "timed_out" && - task.status !== "lost" && - task.status !== "cancelled" - ); -} - -function shouldSuppressDuplicateTerminalDelivery(task: TaskRecord): boolean { - if (task.runtime !== "acp" || !task.runId?.trim()) { - return false; - } - const preferred = pickPreferredRunIdTask(getTasksByRunId(task.runId)); - return Boolean(preferred && preferred.taskId !== task.taskId); -} - export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise { ensureTaskRegistryReady(); const current = tasks.get(taskId); - if (!current || !shouldAutoDeliverTaskUpdate(current)) { + if (!current || !shouldAutoDeliverTaskTerminalUpdate(current)) { return current ? cloneTaskRecord(current) : null; } if (tasksWithPendingDelivery.has(taskId)) { @@ -599,10 +497,15 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise= latestEvent.at) { return cloneTaskRecord(current); } - const eventText = formatTaskStateChangeEvent(current, latestEvent); + const eventText = formatTaskStateChangeMessage(current, latestEvent); if (!eventText) { return cloneTaskRecord(current); }