fix(hooks): expose typed gateway startup context

This commit is contained in:
Vincent Koc
2026-04-22 11:21:27 -07:00
parent 3e24898690
commit 6d003cbcee
9 changed files with 262 additions and 216 deletions

View File

@@ -3,12 +3,6 @@ import path from "node:path";
import { enqueueSystemEvent, resetSystemEventsForTest } from "openclaw/plugin-sdk/infra-runtime";
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
clearInternalHooks,
createInternalHookEvent,
registerInternalHook,
triggerInternalHook,
} from "../../../src/hooks/internal-hooks.js";
import {
__testing,
reconcileShortTermDreamingCronJob,
@@ -26,6 +20,8 @@ afterEach(() => {
resetSystemEventsForTest();
});
function clearInternalHooks(): void {}
type CronParam = NonNullable<Parameters<typeof reconcileShortTermDreamingCronJob>[0]["cron"]>;
type CronJobLike = Awaited<ReturnType<CronParam["list"]>>[number];
type CronAddInput = Parameters<CronParam["add"]>[0];
@@ -36,7 +32,7 @@ type DreamingPluginApiTestDouble = {
pluginConfig: Record<string, unknown>;
logger: ReturnType<typeof createLogger>;
runtime: unknown;
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => void;
registerHook: (event: string, handler: (event: unknown) => unknown) => void;
on: ReturnType<typeof vi.fn>;
};
@@ -156,6 +152,29 @@ function getBeforeAgentReplyHandler(
) => Promise<unknown>;
}
function getGatewayStartHandler(
onMock: ReturnType<typeof vi.fn>,
): (
event: { port: number },
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown },
) => Promise<unknown> {
const call = onMock.mock.calls.find(([eventName]) => eventName === "gateway_start");
if (!call) {
throw new Error("gateway_start hook was not registered");
}
return call[1] as (
event: { port: number },
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown },
) => Promise<unknown>;
}
async function triggerGatewayStart(
onMock: ReturnType<typeof vi.fn>,
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown },
): Promise<void> {
await getGatewayStartHandler(onMock)({ port: 18789 }, ctx);
}
function registerShortTermPromotionDreamingForTest(api: DreamingPluginApiTestDouble): void {
registerShortTermPromotionDreaming(api as unknown as DreamingPluginApi);
}
@@ -380,17 +399,11 @@ describe("short-term dreaming config", () => {
});
});
describe("short-term dreaming startup event parsing", () => {
it("resolves cron service from gateway startup event deps", () => {
describe("short-term dreaming gateway_start context parsing", () => {
it("resolves cron service from the typed gateway_start cron getter", () => {
const harness = createCronHarness();
const resolved = __testing.resolveCronServiceFromStartupEvent({
type: "gateway",
action: "startup",
context: {
deps: {
cron: harness.cron,
},
},
const resolved = __testing.resolveCronServiceFromGatewayContext({
getCron: () => harness.cron,
});
expect(resolved).toBe(harness.cron);
});
@@ -720,40 +733,37 @@ describe("gateway startup reconciliation", () => {
clearInternalHooks();
const logger = createLogger();
const harness = createCronHarness();
const onMock = vi.fn();
const api: DreamingPluginApiTestDouble = {
config: { plugins: { entries: {} } },
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
on: vi.fn(),
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: {
hooks: { internal: { enabled: true } },
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
await triggerGatewayStart(onMock, {
config: {
hooks: { internal: { enabled: true } },
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
},
},
},
} as OpenClawConfig,
deps: { cron: harness.cron },
}),
);
},
} as OpenClawConfig,
getCron: () => harness.cron,
});
expect(harness.addCalls).toHaveLength(1);
expect(harness.addCalls[0]).toMatchObject({
@@ -795,21 +805,16 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
const deps = { cron: harness.cron };
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps,
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => harness.cron,
});
expect(harness.addCalls).toHaveLength(0);
@@ -870,21 +875,17 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
const deps = { cron: startupHarness.cron };
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps,
}),
);
const cronRef = { current: startupHarness.cron };
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => cronRef.current,
});
expect(startupHarness.addCalls).toHaveLength(1);
const managed = startupHarness.jobs.find((job) =>
@@ -903,7 +904,7 @@ describe("gateway startup reconciliation", () => {
]
: [],
);
deps.cron = reloadedHarness.cron;
cronRef.current = reloadedHarness.cron;
api.config = {
plugins: {
entries: {
@@ -962,20 +963,16 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => harness.cron,
});
expect(harness.addCalls).toHaveLength(1);
harness.jobs.splice(
@@ -1028,20 +1025,16 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => harness.cron,
});
expect(harness.listCalls).toBe(1);
@@ -1084,20 +1077,16 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => harness.cron,
});
expect(harness.listCalls).toBe(1);
@@ -1140,20 +1129,16 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => harness.cron,
});
const sessionKey = "agent:main:main";
enqueueSystemEvent(constants.DREAMING_SYSTEM_EVENT_TEXT, {
@@ -1207,20 +1192,16 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => harness.cron,
});
enqueueSystemEvent(constants.DREAMING_SYSTEM_EVENT_TEXT, {
sessionKey: "agent:main:main",
@@ -1242,7 +1223,7 @@ describe("gateway startup reconciliation", () => {
}
});
it("does not emit the cron-unavailable warning on gateway:startup when deps.cron is missing (regression #69939)", async () => {
it("does not emit the cron-unavailable warning on gateway_start when cron is missing (regression #69939)", async () => {
clearInternalHooks();
const logger = createLogger();
const api: DreamingPluginApiTestDouble = {
@@ -1250,43 +1231,38 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: vi.fn(),
};
try {
registerShortTermPromotionDreamingForTest(api);
// Simulate the startup race: gateway:startup fires before deps.cron is attached.
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: {
hooks: { internal: { enabled: true } },
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
await triggerGatewayStart(api.on, {
config: {
hooks: { internal: { enabled: true } },
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
},
},
},
} as OpenClawConfig,
deps: {},
}),
);
},
} as OpenClawConfig,
getCron: () => undefined,
});
expect(logger.warn).not.toHaveBeenCalledWith(
expect.stringContaining("cron service unavailable"),
);
// The startup-path log should be demoted to debug instead.
expect(logger.debug).toHaveBeenCalledWith(
expect.stringContaining("cron service not yet available at gateway:startup"),
expect.stringContaining("cron service not yet available at gateway_start"),
);
} finally {
clearInternalHooks();
@@ -1316,21 +1292,17 @@ describe("gateway startup reconciliation", () => {
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
registerHook: () => {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
// Startup without cron — must stay silent on warn.
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: {},
}),
);
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => undefined,
});
expect(logger.warn).not.toHaveBeenCalled();
// Now a runtime heartbeat reconciliation happens and cron is still missing

View File

@@ -21,7 +21,6 @@ import { writeDeepDreamingReport } from "./dreaming-markdown.js";
import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js";
import { runDreamingSweepPhases } from "./dreaming-phases.js";
import {
asRecord,
formatErrorMessage,
includesSystemEventToken,
normalizeTrimmedString,
@@ -95,11 +94,6 @@ type CronServiceLike = {
remove: (id: string) => Promise<{ removed?: boolean }>;
};
type StartupCronSourceRefs = {
context: Record<string, unknown>;
deps: Record<string, unknown> | null;
};
export type ShortTermPromotionDreamingConfig = {
enabled: boolean;
cron: string;
@@ -311,45 +305,8 @@ function resolveCronServiceFromCandidate(candidate: unknown): CronServiceLike |
return cron as CronServiceLike;
}
function resolveStartupCronSourceFromEvent(event: unknown): StartupCronSourceRefs | null {
const payload = asRecord(event);
if (!payload) {
return null;
}
if (payload.type !== "gateway" || payload.action !== "startup") {
return null;
}
const context = asRecord(payload.context);
if (!context) {
return null;
}
return { context, deps: asRecord(context.deps) };
}
function resolveCronServiceFromStartupSource(
source: StartupCronSourceRefs | null,
): CronServiceLike | null {
if (!source) {
return null;
}
return (
resolveCronServiceFromCandidate(source.context.cron) ??
resolveCronServiceFromCandidate(source.deps?.cron)
);
}
function resolveCronServiceFromStartupEvent(event: unknown): CronServiceLike | null {
return resolveCronServiceFromStartupSource(resolveStartupCronSourceFromEvent(event));
}
function resolveStartupConfigFromEvent(event: unknown, fallback: OpenClawConfig): OpenClawConfig {
const startupEvent = asRecord(event);
const startupContext = asRecord(startupEvent?.context);
const startupCfg = asRecord(startupContext?.cfg);
if (!startupCfg) {
return fallback;
}
return startupCfg as OpenClawConfig;
function resolveCronServiceFromGatewayContext(context: { getCron?: () => unknown } | undefined) {
return resolveCronServiceFromCandidate(context?.getCron?.());
}
function resolveDreamingTriggerSessionKeys(sessionKey?: string): string[] {
@@ -675,7 +632,7 @@ export async function runShortTermDreamingPromotionIfTriggered(params: {
}
export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void {
let startupCronSource: StartupCronSourceRefs | null = null;
let resolveStartupCron: (() => CronServiceLike | null) | null = null;
let unavailableCronWarningEmitted = false;
let lastRuntimeReconcileAtMs = 0;
let lastRuntimeConfigKey: string | null = null;
@@ -699,12 +656,11 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
const reconcileManagedDreamingCron = async (params: {
reason: "startup" | "runtime";
startupEvent?: unknown;
startupConfig?: OpenClawConfig;
startupCron?: (() => CronServiceLike | null) | null;
}): Promise<ShortTermPromotionDreamingConfig> => {
const startupCfg =
params.reason === "startup" && params.startupEvent !== undefined
? resolveStartupConfigFromEvent(params.startupEvent, api.config)
: api.config;
params.reason === "startup" ? (params.startupConfig ?? api.config) : api.config;
const config = resolveShortTermPromotionDreamingConfig({
pluginConfig:
resolveMemoryCorePluginConfig(startupCfg) ??
@@ -712,20 +668,18 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
api.pluginConfig,
cfg: startupCfg,
});
if (params.reason === "startup" && params.startupEvent !== undefined) {
startupCronSource = resolveStartupCronSourceFromEvent(params.startupEvent);
if (params.reason === "startup") {
resolveStartupCron = params.startupCron ?? null;
}
const cron = resolveCronServiceFromStartupSource(startupCronSource);
const cron = resolveStartupCron?.() ?? null;
const configKey = runtimeConfigKey(config);
if (!cron && config.enabled && !unavailableCronWarningEmitted) {
// The gateway emits `gateway:startup` via a deferred setTimeout, and
// `deps.cron` may not be attached to the event context yet when memory-core's
// startup hook fires (see issue #69939). Avoid logging a confusing warning on
// the startup path — the runtime reconciliation path (heartbeat-driven) will
// still warn if the cron service remains unavailable after boot.
// Avoid a noisy startup-path warning when the gateway has not exposed cron yet.
// The runtime reconciliation path (heartbeat-driven) will still warn if the
// cron service remains unavailable after boot.
if (params.reason === "startup") {
api.logger.debug?.(
"memory-core: cron service not yet available at gateway:startup; deferring to runtime reconciliation.",
"memory-core: cron service not yet available at gateway_start; deferring to runtime reconciliation.",
);
} else {
api.logger.warn(
@@ -760,22 +714,19 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
return config;
};
api.registerHook(
"gateway:startup",
async (event: unknown) => {
try {
await reconcileManagedDreamingCron({
reason: "startup",
startupEvent: event,
});
} catch (err) {
api.logger.error(
`memory-core: dreaming startup reconciliation failed: ${formatErrorMessage(err)}`,
);
}
},
{ name: "memory-core-short-term-dreaming-cron" },
);
api.on("gateway_start", async (_event, ctx) => {
try {
await reconcileManagedDreamingCron({
reason: "startup",
startupConfig: ctx.config,
startupCron: () => resolveCronServiceFromGatewayContext(ctx),
});
} catch (err) {
api.logger.error(
`memory-core: dreaming startup reconciliation failed: ${formatErrorMessage(err)}`,
);
}
});
api.on("before_agent_reply", async (event, ctx) => {
try {
@@ -811,7 +762,7 @@ export const __testing = {
buildManagedDreamingCronJob,
buildManagedDreamingPatch,
isManagedDreamingJob,
resolveCronServiceFromStartupEvent,
resolveCronServiceFromGatewayContext,
constants: {
MANAGED_DREAMING_CRON_NAME,
MANAGED_DREAMING_CRON_TAG,