diff --git a/src/tasks/task-flow-registry.store.ts b/src/tasks/task-flow-registry.store.ts index 8dd03a862ee..c131e7bf1b9 100644 --- a/src/tasks/task-flow-registry.store.ts +++ b/src/tasks/task-flow-registry.store.ts @@ -19,7 +19,7 @@ export type TaskFlowRegistryStore = { close?: () => void; }; -export type TaskFlowRegistryHookEvent = +export type TaskFlowRegistryObserverEvent = | { kind: "restored"; flows: TaskFlowRecord[]; @@ -35,9 +35,9 @@ export type TaskFlowRegistryHookEvent = previous: TaskFlowRecord; }; -export type TaskFlowRegistryHooks = { - // Hooks are incremental/observational. Snapshot persistence belongs to TaskFlowRegistryStore. - onEvent?: (event: TaskFlowRegistryHookEvent) => void; +export type TaskFlowRegistryObservers = { + // Observers are incremental/best-effort only. Snapshot persistence belongs to TaskFlowRegistryStore. + onEvent?: (event: TaskFlowRegistryObserverEvent) => void; }; const defaultFlowRegistryStore: TaskFlowRegistryStore = { @@ -49,30 +49,30 @@ const defaultFlowRegistryStore: TaskFlowRegistryStore = { }; let configuredFlowRegistryStore: TaskFlowRegistryStore = defaultFlowRegistryStore; -let configuredFlowRegistryHooks: TaskFlowRegistryHooks | null = null; +let configuredFlowRegistryObservers: TaskFlowRegistryObservers | null = null; export function getTaskFlowRegistryStore(): TaskFlowRegistryStore { return configuredFlowRegistryStore; } -export function getTaskFlowRegistryHooks(): TaskFlowRegistryHooks | null { - return configuredFlowRegistryHooks; +export function getTaskFlowRegistryObservers(): TaskFlowRegistryObservers | null { + return configuredFlowRegistryObservers; } export function configureTaskFlowRegistryRuntime(params: { store?: TaskFlowRegistryStore; - hooks?: TaskFlowRegistryHooks | null; + observers?: TaskFlowRegistryObservers | null; }) { if (params.store) { configuredFlowRegistryStore = params.store; } - if ("hooks" in params) { - configuredFlowRegistryHooks = params.hooks ?? null; + if ("observers" in params) { + configuredFlowRegistryObservers = params.observers ?? null; } } export function resetTaskFlowRegistryRuntimeForTests() { configuredFlowRegistryStore.close?.(); configuredFlowRegistryStore = defaultFlowRegistryStore; - configuredFlowRegistryHooks = null; + configuredFlowRegistryObservers = null; } diff --git a/src/tasks/task-flow-registry.test.ts b/src/tasks/task-flow-registry.test.ts index 213cab99e57..792f9d9a6b5 100644 --- a/src/tasks/task-flow-registry.test.ts +++ b/src/tasks/task-flow-registry.test.ts @@ -194,7 +194,7 @@ describe("task-flow-registry", () => { }); }); - it("emits restored, upserted, and deleted flow hook events", () => { + it("emits restored, upserted, and deleted flow observer events", () => { const onEvent = vi.fn(); configureTaskFlowRegistryRuntime({ store: { @@ -203,15 +203,15 @@ describe("task-flow-registry", () => { }), saveSnapshot: () => {}, }, - hooks: { + observers: { onEvent, }, }); const created = createManagedTaskFlow({ ownerKey: "agent:main:main", - controllerId: "tests/hooks", - goal: "Observe hooks", + controllerId: "tests/observers", + goal: "Observe observers", }); deleteTaskFlowRecordById(created.flowId); diff --git a/src/tasks/task-flow-registry.ts b/src/tasks/task-flow-registry.ts index 4f5777cc620..cd5275395a5 100644 --- a/src/tasks/task-flow-registry.ts +++ b/src/tasks/task-flow-registry.ts @@ -1,10 +1,10 @@ import crypto from "node:crypto"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { - getTaskFlowRegistryHooks, + getTaskFlowRegistryObservers, getTaskFlowRegistryStore, resetTaskFlowRegistryRuntimeForTests, - type TaskFlowRegistryHookEvent, + type TaskFlowRegistryObserverEvent, } from "./task-flow-registry.store.js"; import type { TaskFlowRecord, @@ -118,15 +118,15 @@ function snapshotFlowRecords(source: ReadonlyMap): TaskF return [...source.values()].map((record) => cloneFlowRecord(record)); } -function emitFlowRegistryHookEvent(createEvent: () => TaskFlowRegistryHookEvent): void { - const hooks = getTaskFlowRegistryHooks(); - if (!hooks?.onEvent) { +function emitFlowRegistryObserverEvent(createEvent: () => TaskFlowRegistryObserverEvent): void { + const observers = getTaskFlowRegistryObservers(); + if (!observers?.onEvent) { return; } try { - hooks.onEvent(createEvent()); + observers.onEvent(createEvent()); } catch { - // Flow hooks are observational. They must not break registry writes. + // Flow observers are best-effort only. They must not break registry writes. } } @@ -216,7 +216,7 @@ function ensureFlowRegistryReady() { log.warn("Failed to restore task-flow registry", { error }); return; } - emitFlowRegistryHookEvent(() => ({ + emitFlowRegistryObserverEvent(() => ({ kind: "restored", flows: snapshotFlowRecords(flows), })); @@ -339,7 +339,7 @@ function applyFlowPatch(current: TaskFlowRecord, patch: FlowRecordPatch): TaskFl function writeFlowRecord(next: TaskFlowRecord, previous?: TaskFlowRecord): TaskFlowRecord { flows.set(next.flowId, next); persistFlowUpsert(next); - emitFlowRegistryHookEvent(() => ({ + emitFlowRegistryObserverEvent(() => ({ kind: "upserted", flow: cloneFlowRecord(next), ...(previous ? { previous: cloneFlowRecord(previous) } : {}), @@ -695,7 +695,7 @@ export function deleteTaskFlowRecordById(flowId: string): boolean { } flows.delete(flowId); persistFlowDelete(flowId); - emitFlowRegistryHookEvent(() => ({ + emitFlowRegistryObserverEvent(() => ({ kind: "deleted", flowId, previous: cloneFlowRecord(current), diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index e7950d71e97..7304cd871f4 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -12,7 +12,10 @@ import { resetTaskRegistryForTests, } from "./task-registry.js"; import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; -import { configureTaskRegistryRuntime, type TaskRegistryHookEvent } from "./task-registry.store.js"; +import { + configureTaskRegistryRuntime, + type TaskRegistryObserverEvent, +} from "./task-registry.store.js"; import type { TaskRecord } from "./task-registry.types.js"; function createStoredTask(): TaskRecord { @@ -80,8 +83,8 @@ describe("task-registry store runtime", () => { expect(latestSnapshot.tasks.get("task-restored")?.task).toBe("Restored task"); }); - it("emits incremental hook events for restore, mutation, and delete", () => { - const events: TaskRegistryHookEvent[] = []; + it("emits incremental observer events for restore, mutation, and delete", () => { + const events: TaskRegistryObserverEvent[] = []; configureTaskRegistryRuntime({ store: { loadSnapshot: () => ({ @@ -90,7 +93,7 @@ describe("task-registry store runtime", () => { }), saveSnapshot: () => {}, }, - hooks: { + observers: { onEvent: (event) => { events.push(event); }, diff --git a/src/tasks/task-registry.store.ts b/src/tasks/task-registry.store.ts index ba23bd91b2f..0d51121eafe 100644 --- a/src/tasks/task-registry.store.ts +++ b/src/tasks/task-registry.store.ts @@ -31,7 +31,7 @@ export type TaskRegistryStore = { close?: () => void; }; -export type TaskRegistryHookEvent = +export type TaskRegistryObserverEvent = | { kind: "restored"; tasks: TaskRecord[]; @@ -47,9 +47,9 @@ export type TaskRegistryHookEvent = previous: TaskRecord; }; -export type TaskRegistryHooks = { - // Hooks are incremental/observational. Snapshot persistence belongs to TaskRegistryStore. - onEvent?: (event: TaskRegistryHookEvent) => void; +export type TaskRegistryObservers = { + // Observers are incremental/best-effort only. Snapshot persistence belongs to TaskRegistryStore. + onEvent?: (event: TaskRegistryObserverEvent) => void; }; const defaultTaskRegistryStore: TaskRegistryStore = { @@ -65,30 +65,30 @@ const defaultTaskRegistryStore: TaskRegistryStore = { }; let configuredTaskRegistryStore: TaskRegistryStore = defaultTaskRegistryStore; -let configuredTaskRegistryHooks: TaskRegistryHooks | null = null; +let configuredTaskRegistryObservers: TaskRegistryObservers | null = null; export function getTaskRegistryStore(): TaskRegistryStore { return configuredTaskRegistryStore; } -export function getTaskRegistryHooks(): TaskRegistryHooks | null { - return configuredTaskRegistryHooks; +export function getTaskRegistryObservers(): TaskRegistryObservers | null { + return configuredTaskRegistryObservers; } export function configureTaskRegistryRuntime(params: { store?: TaskRegistryStore; - hooks?: TaskRegistryHooks | null; + observers?: TaskRegistryObservers | null; }) { if (params.store) { configuredTaskRegistryStore = params.store; } - if ("hooks" in params) { - configuredTaskRegistryHooks = params.hooks ?? null; + if ("observers" in params) { + configuredTaskRegistryObservers = params.observers ?? null; } } export function resetTaskRegistryRuntimeForTests() { configuredTaskRegistryStore.close?.(); configuredTaskRegistryStore = defaultTaskRegistryStore; - configuredTaskRegistryHooks = null; + configuredTaskRegistryObservers = null; } diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index d265ff8e204..3b3ef426531 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -25,10 +25,10 @@ import { updateFlowRecordByIdExpectedRevision, } from "./task-flow-runtime-internal.js"; import { - getTaskRegistryHooks, + getTaskRegistryObservers, getTaskRegistryStore, resetTaskRegistryRuntimeForTests, - type TaskRegistryHookEvent, + type TaskRegistryObserverEvent, } from "./task-registry.store.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { @@ -174,15 +174,15 @@ function snapshotTaskRecords(source: ReadonlyMap): TaskRecor return [...source.values()].map((record) => cloneTaskRecord(record)); } -function emitTaskRegistryHookEvent(createEvent: () => TaskRegistryHookEvent): void { - const hooks = getTaskRegistryHooks(); - if (!hooks?.onEvent) { +function emitTaskRegistryObserverEvent(createEvent: () => TaskRegistryObserverEvent): void { + const observers = getTaskRegistryObservers(); + if (!observers?.onEvent) { return; } try { - hooks.onEvent(createEvent()); + observers.onEvent(createEvent()); } catch (error) { - log.warn("Task registry hook failed", { + log.warn("Task registry observer failed", { event: "task-registry", error, }); @@ -826,7 +826,7 @@ function restoreTaskRegistryOnce() { rebuildOwnerKeyIndex(); rebuildParentFlowIdIndex(); rebuildRelatedSessionKeyIndex(); - emitTaskRegistryHookEvent(() => ({ + emitTaskRegistryObserverEvent(() => ({ kind: "restored", tasks: snapshotTaskRecords(tasks), })); @@ -888,7 +888,7 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu error, }); } - emitTaskRegistryHookEvent(() => ({ + emitTaskRegistryObserverEvent(() => ({ kind: "upserted", task: cloneTaskRecord(next), previous: cloneTaskRecord(current), @@ -1456,7 +1456,7 @@ export function createTaskRecord(params: { error, }); } - emitTaskRegistryHookEvent(() => ({ + emitTaskRegistryObserverEvent(() => ({ kind: "upserted", task: cloneTaskRecord(record), })); @@ -1902,7 +1902,7 @@ export function deleteTaskRecordById(taskId: string): boolean { rebuildRunIdIndex(); persistTaskDelete(taskId); persistTaskDeliveryStateDelete(taskId); - emitTaskRegistryHookEvent(() => ({ + emitTaskRegistryObserverEvent(() => ({ kind: "deleted", taskId: current.taskId, previous: cloneTaskRecord(current),