refactor(tasks): rename registry hooks to observers (#59829)

This commit is contained in:
Vincent Koc
2026-04-03 02:42:59 +09:00
committed by GitHub
parent 676b748056
commit 7aa22959e4
6 changed files with 54 additions and 51 deletions

View File

@@ -19,7 +19,7 @@ export type TaskFlowRegistryStore = {
close?: () => void;
};
export type TaskFlowRegistryHookEvent =
export type TaskFlowRegistryObserverEvent =
| {
kind: "restored";
flows: TaskFlowRecord[];
@@ -35,9 +35,9 @@ export type TaskFlowRegistryHookEvent =
previous: TaskFlowRecord;
};
export type TaskFlowRegistryHooks = {
// Hooks are incremental/observational. Snapshot persistence belongs to TaskFlowRegistryStore.
onEvent?: (event: TaskFlowRegistryHookEvent) => void;
export type TaskFlowRegistryObservers = {
// Observers are incremental/best-effort only. Snapshot persistence belongs to TaskFlowRegistryStore.
onEvent?: (event: TaskFlowRegistryObserverEvent) => void;
};
const defaultFlowRegistryStore: TaskFlowRegistryStore = {
@@ -49,30 +49,30 @@ const defaultFlowRegistryStore: TaskFlowRegistryStore = {
};
let configuredFlowRegistryStore: TaskFlowRegistryStore = defaultFlowRegistryStore;
let configuredFlowRegistryHooks: TaskFlowRegistryHooks | null = null;
let configuredFlowRegistryObservers: TaskFlowRegistryObservers | null = null;
export function getTaskFlowRegistryStore(): TaskFlowRegistryStore {
return configuredFlowRegistryStore;
}
export function getTaskFlowRegistryHooks(): TaskFlowRegistryHooks | null {
return configuredFlowRegistryHooks;
export function getTaskFlowRegistryObservers(): TaskFlowRegistryObservers | null {
return configuredFlowRegistryObservers;
}
export function configureTaskFlowRegistryRuntime(params: {
store?: TaskFlowRegistryStore;
hooks?: TaskFlowRegistryHooks | null;
observers?: TaskFlowRegistryObservers | null;
}) {
if (params.store) {
configuredFlowRegistryStore = params.store;
}
if ("hooks" in params) {
configuredFlowRegistryHooks = params.hooks ?? null;
if ("observers" in params) {
configuredFlowRegistryObservers = params.observers ?? null;
}
}
export function resetTaskFlowRegistryRuntimeForTests() {
configuredFlowRegistryStore.close?.();
configuredFlowRegistryStore = defaultFlowRegistryStore;
configuredFlowRegistryHooks = null;
configuredFlowRegistryObservers = null;
}

View File

@@ -194,7 +194,7 @@ describe("task-flow-registry", () => {
});
});
it("emits restored, upserted, and deleted flow hook events", () => {
it("emits restored, upserted, and deleted flow observer events", () => {
const onEvent = vi.fn();
configureTaskFlowRegistryRuntime({
store: {
@@ -203,15 +203,15 @@ describe("task-flow-registry", () => {
}),
saveSnapshot: () => {},
},
hooks: {
observers: {
onEvent,
},
});
const created = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/hooks",
goal: "Observe hooks",
controllerId: "tests/observers",
goal: "Observe observers",
});
deleteTaskFlowRecordById(created.flowId);

View File

@@ -1,10 +1,10 @@
import crypto from "node:crypto";
import { createSubsystemLogger } from "../logging/subsystem.js";
import {
getTaskFlowRegistryHooks,
getTaskFlowRegistryObservers,
getTaskFlowRegistryStore,
resetTaskFlowRegistryRuntimeForTests,
type TaskFlowRegistryHookEvent,
type TaskFlowRegistryObserverEvent,
} from "./task-flow-registry.store.js";
import type {
TaskFlowRecord,
@@ -118,15 +118,15 @@ function snapshotFlowRecords(source: ReadonlyMap<string, TaskFlowRecord>): TaskF
return [...source.values()].map((record) => cloneFlowRecord(record));
}
function emitFlowRegistryHookEvent(createEvent: () => TaskFlowRegistryHookEvent): void {
const hooks = getTaskFlowRegistryHooks();
if (!hooks?.onEvent) {
function emitFlowRegistryObserverEvent(createEvent: () => TaskFlowRegistryObserverEvent): void {
const observers = getTaskFlowRegistryObservers();
if (!observers?.onEvent) {
return;
}
try {
hooks.onEvent(createEvent());
observers.onEvent(createEvent());
} catch {
// Flow hooks are observational. They must not break registry writes.
// Flow observers are best-effort only. They must not break registry writes.
}
}
@@ -216,7 +216,7 @@ function ensureFlowRegistryReady() {
log.warn("Failed to restore task-flow registry", { error });
return;
}
emitFlowRegistryHookEvent(() => ({
emitFlowRegistryObserverEvent(() => ({
kind: "restored",
flows: snapshotFlowRecords(flows),
}));
@@ -339,7 +339,7 @@ function applyFlowPatch(current: TaskFlowRecord, patch: FlowRecordPatch): TaskFl
function writeFlowRecord(next: TaskFlowRecord, previous?: TaskFlowRecord): TaskFlowRecord {
flows.set(next.flowId, next);
persistFlowUpsert(next);
emitFlowRegistryHookEvent(() => ({
emitFlowRegistryObserverEvent(() => ({
kind: "upserted",
flow: cloneFlowRecord(next),
...(previous ? { previous: cloneFlowRecord(previous) } : {}),
@@ -695,7 +695,7 @@ export function deleteTaskFlowRecordById(flowId: string): boolean {
}
flows.delete(flowId);
persistFlowDelete(flowId);
emitFlowRegistryHookEvent(() => ({
emitFlowRegistryObserverEvent(() => ({
kind: "deleted",
flowId,
previous: cloneFlowRecord(current),

View File

@@ -12,7 +12,10 @@ import {
resetTaskRegistryForTests,
} from "./task-registry.js";
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
import { configureTaskRegistryRuntime, type TaskRegistryHookEvent } from "./task-registry.store.js";
import {
configureTaskRegistryRuntime,
type TaskRegistryObserverEvent,
} from "./task-registry.store.js";
import type { TaskRecord } from "./task-registry.types.js";
function createStoredTask(): TaskRecord {
@@ -80,8 +83,8 @@ describe("task-registry store runtime", () => {
expect(latestSnapshot.tasks.get("task-restored")?.task).toBe("Restored task");
});
it("emits incremental hook events for restore, mutation, and delete", () => {
const events: TaskRegistryHookEvent[] = [];
it("emits incremental observer events for restore, mutation, and delete", () => {
const events: TaskRegistryObserverEvent[] = [];
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => ({
@@ -90,7 +93,7 @@ describe("task-registry store runtime", () => {
}),
saveSnapshot: () => {},
},
hooks: {
observers: {
onEvent: (event) => {
events.push(event);
},

View File

@@ -31,7 +31,7 @@ export type TaskRegistryStore = {
close?: () => void;
};
export type TaskRegistryHookEvent =
export type TaskRegistryObserverEvent =
| {
kind: "restored";
tasks: TaskRecord[];
@@ -47,9 +47,9 @@ export type TaskRegistryHookEvent =
previous: TaskRecord;
};
export type TaskRegistryHooks = {
// Hooks are incremental/observational. Snapshot persistence belongs to TaskRegistryStore.
onEvent?: (event: TaskRegistryHookEvent) => void;
export type TaskRegistryObservers = {
// Observers are incremental/best-effort only. Snapshot persistence belongs to TaskRegistryStore.
onEvent?: (event: TaskRegistryObserverEvent) => void;
};
const defaultTaskRegistryStore: TaskRegistryStore = {
@@ -65,30 +65,30 @@ const defaultTaskRegistryStore: TaskRegistryStore = {
};
let configuredTaskRegistryStore: TaskRegistryStore = defaultTaskRegistryStore;
let configuredTaskRegistryHooks: TaskRegistryHooks | null = null;
let configuredTaskRegistryObservers: TaskRegistryObservers | null = null;
export function getTaskRegistryStore(): TaskRegistryStore {
return configuredTaskRegistryStore;
}
export function getTaskRegistryHooks(): TaskRegistryHooks | null {
return configuredTaskRegistryHooks;
export function getTaskRegistryObservers(): TaskRegistryObservers | null {
return configuredTaskRegistryObservers;
}
export function configureTaskRegistryRuntime(params: {
store?: TaskRegistryStore;
hooks?: TaskRegistryHooks | null;
observers?: TaskRegistryObservers | null;
}) {
if (params.store) {
configuredTaskRegistryStore = params.store;
}
if ("hooks" in params) {
configuredTaskRegistryHooks = params.hooks ?? null;
if ("observers" in params) {
configuredTaskRegistryObservers = params.observers ?? null;
}
}
export function resetTaskRegistryRuntimeForTests() {
configuredTaskRegistryStore.close?.();
configuredTaskRegistryStore = defaultTaskRegistryStore;
configuredTaskRegistryHooks = null;
configuredTaskRegistryObservers = null;
}

View File

@@ -25,10 +25,10 @@ import {
updateFlowRecordByIdExpectedRevision,
} from "./task-flow-runtime-internal.js";
import {
getTaskRegistryHooks,
getTaskRegistryObservers,
getTaskRegistryStore,
resetTaskRegistryRuntimeForTests,
type TaskRegistryHookEvent,
type TaskRegistryObserverEvent,
} from "./task-registry.store.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type {
@@ -174,15 +174,15 @@ function snapshotTaskRecords(source: ReadonlyMap<string, TaskRecord>): TaskRecor
return [...source.values()].map((record) => cloneTaskRecord(record));
}
function emitTaskRegistryHookEvent(createEvent: () => TaskRegistryHookEvent): void {
const hooks = getTaskRegistryHooks();
if (!hooks?.onEvent) {
function emitTaskRegistryObserverEvent(createEvent: () => TaskRegistryObserverEvent): void {
const observers = getTaskRegistryObservers();
if (!observers?.onEvent) {
return;
}
try {
hooks.onEvent(createEvent());
observers.onEvent(createEvent());
} catch (error) {
log.warn("Task registry hook failed", {
log.warn("Task registry observer failed", {
event: "task-registry",
error,
});
@@ -826,7 +826,7 @@ function restoreTaskRegistryOnce() {
rebuildOwnerKeyIndex();
rebuildParentFlowIdIndex();
rebuildRelatedSessionKeyIndex();
emitTaskRegistryHookEvent(() => ({
emitTaskRegistryObserverEvent(() => ({
kind: "restored",
tasks: snapshotTaskRecords(tasks),
}));
@@ -888,7 +888,7 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
error,
});
}
emitTaskRegistryHookEvent(() => ({
emitTaskRegistryObserverEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(next),
previous: cloneTaskRecord(current),
@@ -1456,7 +1456,7 @@ export function createTaskRecord(params: {
error,
});
}
emitTaskRegistryHookEvent(() => ({
emitTaskRegistryObserverEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(record),
}));
@@ -1902,7 +1902,7 @@ export function deleteTaskRecordById(taskId: string): boolean {
rebuildRunIdIndex();
persistTaskDelete(taskId);
persistTaskDeliveryStateDelete(taskId);
emitTaskRegistryHookEvent(() => ({
emitTaskRegistryObserverEvent(() => ({
kind: "deleted",
taskId: current.taskId,
previous: cloneTaskRecord(current),