diff --git a/CHANGELOG.md b/CHANGELOG.md index 69788a0c2e5..8e12047e584 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Plugins/media: auto-enable provider plugins referenced by `agents.defaults.imageGenerationModel`, `videoGenerationModel`, and `musicGenerationModel` primary/fallback refs, so configured Google and MiniMax media providers do not stay disabled behind a restrictive plugin allowlist. Thanks @vincentkoc. +- Memory-core/dreaming: retry managed dreaming cron registration after startup when the cron service is not reachable yet, so the scheduled Memory Dreaming Promotion sweep recovers without waiting for heartbeat traffic. Fixes #72841. Thanks @amknight. ## 2026.4.27 diff --git a/extensions/memory-core/src/dreaming.test.ts b/extensions/memory-core/src/dreaming.test.ts index 5dc96c444d3..14553eac822 100644 --- a/extensions/memory-core/src/dreaming.test.ts +++ b/extensions/memory-core/src/dreaming.test.ts @@ -59,7 +59,11 @@ async function writeDailyMemoryNote( function createCronHarness( initialJobs: CronJobLike[] = [], - opts?: { removeResult?: "boolean" | "unknown"; removeThrowsForIds?: string[] }, + opts?: { + listThrowsForFirstCalls?: number; + removeResult?: "boolean" | "unknown"; + removeThrowsForIds?: string[]; + }, ) { const jobs: CronJobLike[] = [...initialJobs]; let listCalls = 0; @@ -70,6 +74,9 @@ function createCronHarness( const cron: CronParam = { async list() { listCalls += 1; + if (opts?.listThrowsForFirstCalls && listCalls <= opts.listThrowsForFirstCalls) { + throw new Error(`list failed on call ${listCalls}`); + } return jobs.map((job) => ({ ...job, ...(job.schedule ? { schedule: { ...job.schedule } } : {}), @@ -173,6 +180,22 @@ function getGatewayStartHandler( ) => Promise; } +function getGatewayStopHandler( + onMock: ReturnType, +): ( + event: { reason?: string }, + ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown }, +) => Promise | void { + const call = onMock.mock.calls.find(([eventName]) => eventName === "gateway_stop"); + if (!call) { + throw new Error("gateway_stop hook was not registered"); + } + return call[1] as ( + event: { reason?: string }, + ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown }, + ) => Promise | void; +} + async function triggerGatewayStart( onMock: ReturnType, ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown }, @@ -180,6 +203,13 @@ async function triggerGatewayStart( await getGatewayStartHandler(onMock)({ port: 18789 }, ctx); } +async function triggerGatewayStop( + onMock: ReturnType, + ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown } = {}, +): Promise { + await getGatewayStopHandler(onMock)({ reason: "test" }, ctx); +} + function registerShortTermPromotionDreamingForTest(api: DreamingPluginApiTestDouble): void { registerShortTermPromotionDreaming(api as unknown as DreamingPluginApi); } @@ -1332,6 +1362,191 @@ describe("gateway startup reconciliation", () => { } }); + it("retries startup cron reconciliation until cron is available without a heartbeat (regression #72841)", async () => { + vi.useFakeTimers(); + clearInternalHooks(); + const logger = createLogger(); + const harness = createCronHarness(); + const onMock = vi.fn(); + const api: DreamingPluginApiTestDouble = { + config: { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + frequency: "15 4 * * *", + timezone: "UTC", + }, + }, + }, + }, + }, + }, + pluginConfig: {}, + logger, + runtime: {}, + on: onMock, + }; + + try { + registerShortTermPromotionDreamingForTest(api); + let cronAvailable = false; + await triggerGatewayStart(onMock, { + config: api.config, + getCron: () => (cronAvailable ? harness.cron : undefined), + }); + + expect(harness.addCalls).toHaveLength(0); + expect(logger.debug).toHaveBeenCalledWith( + expect.stringContaining("cron service not yet available at gateway_start"), + ); + + await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS); + expect(harness.addCalls).toHaveLength(0); + expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining("cron service unavailable")); + + cronAvailable = true; + await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS); + + expect(harness.addCalls).toHaveLength(1); + expect(harness.addCalls[0]).toMatchObject({ + name: "Memory Dreaming Promotion", + schedule: { + kind: "cron", + expr: "15 4 * * *", + tz: "UTC", + }, + sessionTarget: "isolated", + payload: { + kind: "agentTurn", + message: constants.DREAMING_SYSTEM_EVENT_TEXT, + lightContext: true, + }, + }); + } finally { + vi.useRealTimers(); + clearInternalHooks(); + } + }); + + it("does not reschedule startup cron retry from stale enabled config after runtime config disables dreaming", async () => { + vi.useFakeTimers(); + clearInternalHooks(); + const logger = createLogger(); + const harness = createCronHarness([], { listThrowsForFirstCalls: 1 }); + const onMock = vi.fn(); + const api: DreamingPluginApiTestDouble = { + config: { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + frequency: "15 4 * * *", + timezone: "UTC", + }, + }, + }, + }, + }, + }, + pluginConfig: {}, + logger, + runtime: {}, + on: onMock, + }; + + try { + registerShortTermPromotionDreamingForTest(api); + let cronAvailable = false; + await triggerGatewayStart(onMock, { + config: api.config, + getCron: () => (cronAvailable ? harness.cron : undefined), + }); + + api.config = { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: false, + frequency: "15 4 * * *", + timezone: "UTC", + }, + }, + }, + }, + }, + } as OpenClawConfig; + cronAvailable = true; + + await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS); + await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("deferred dreaming cron retry failed"), + ); + expect(harness.listCalls).toBe(1); + expect(harness.addCalls).toHaveLength(0); + } finally { + vi.useRealTimers(); + clearInternalHooks(); + } + }); + + it("clears pending startup cron retry on gateway stop", async () => { + vi.useFakeTimers(); + clearInternalHooks(); + const logger = createLogger(); + const harness = createCronHarness(); + const onMock = vi.fn(); + const api: DreamingPluginApiTestDouble = { + config: { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + frequency: "15 4 * * *", + timezone: "UTC", + }, + }, + }, + }, + }, + }, + pluginConfig: {}, + logger, + runtime: {}, + on: onMock, + }; + + try { + registerShortTermPromotionDreamingForTest(api); + let cronAvailable = false; + await triggerGatewayStart(onMock, { + config: api.config, + getCron: () => (cronAvailable ? harness.cron : undefined), + }); + + await triggerGatewayStop(onMock); + cronAvailable = true; + await vi.advanceTimersByTimeAsync( + constants.STARTUP_CRON_RETRY_DELAY_MS * constants.STARTUP_CRON_RETRY_MAX_ATTEMPTS, + ); + + expect(harness.addCalls).toHaveLength(0); + } finally { + vi.useRealTimers(); + clearInternalHooks(); + } + }); + it("uses live runtime config for heartbeat dreaming reconciliation", async () => { clearInternalHooks(); const logger = createLogger(); diff --git a/extensions/memory-core/src/dreaming.ts b/extensions/memory-core/src/dreaming.ts index 9686c1338dd..fccc59a1ce3 100644 --- a/extensions/memory-core/src/dreaming.ts +++ b/extensions/memory-core/src/dreaming.ts @@ -41,6 +41,8 @@ import { } from "./short-term-promotion.js"; const RUNTIME_CRON_RECONCILE_INTERVAL_MS = 60_000; +const STARTUP_CRON_RETRY_DELAY_MS = 5_000; +const STARTUP_CRON_RETRY_MAX_ATTEMPTS = 12; const HEARTBEAT_ISOLATED_SESSION_SUFFIX = ":heartbeat"; type Logger = Pick; @@ -682,10 +684,44 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void let lastRuntimeReconcileAtMs = 0; let lastRuntimeConfigKey: string | null = null; let lastRuntimeCronRef: CronServiceLike | null = null; + let startupCronRetryTimer: ReturnType | null = null; + let startupCronRetryAttempts = 0; + let disposed = false; const resolveCurrentConfig = (): OpenClawConfig => (api.runtime.config?.current?.() ?? api.config) as OpenClawConfig; + const resolveCurrentDreamingConfig = (): ShortTermPromotionDreamingConfig => { + const cfg = resolveCurrentConfig(); + return resolveShortTermPromotionDreamingConfig({ + pluginConfig: resolveMemoryCorePluginConfig(cfg), + cfg, + }); + }; + + const clearStartupCronRetry = (): void => { + if (startupCronRetryTimer) { + clearTimeout(startupCronRetryTimer); + startupCronRetryTimer = null; + } + startupCronRetryAttempts = 0; + }; + + const hasStartupCron = (): boolean => { + try { + return Boolean(resolveStartupCron?.()); + } catch { + return false; + } + }; + + const disposeStartupCronRetry = (): void => { + disposed = true; + clearStartupCronRetry(); + gatewayContext = null; + resolveStartupCron = null; + }; + const runtimeConfigKey = (config: ShortTermPromotionDreamingConfig): string => [ config.enabled ? "enabled" : "disabled", @@ -756,6 +792,7 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void } if (cron) { unavailableCronWarningEmitted = false; + clearStartupCronRetry(); } if (params.reason === "runtime") { const now = Date.now(); @@ -780,15 +817,57 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void return config; }; + const scheduleStartupCronRetry = (config: ShortTermPromotionDreamingConfig): void => { + if (disposed || !config.enabled || hasStartupCron()) { + clearStartupCronRetry(); + return; + } + if (startupCronRetryTimer || startupCronRetryAttempts >= STARTUP_CRON_RETRY_MAX_ATTEMPTS) { + return; + } + startupCronRetryTimer = setTimeout(() => { + startupCronRetryTimer = null; + if (disposed) { + return; + } + startupCronRetryAttempts += 1; + void reconcileManagedDreamingCron({ reason: "runtime" }) + .then((latestConfig) => { + if (disposed || !latestConfig.enabled || hasStartupCron()) { + clearStartupCronRetry(); + return; + } + scheduleStartupCronRetry(latestConfig); + }) + .catch((err) => { + if (disposed) { + return; + } + api.logger.error( + `memory-core: deferred dreaming cron retry failed: ${formatErrorMessage(err)}`, + ); + try { + scheduleStartupCronRetry(resolveCurrentDreamingConfig()); + } catch (configErr) { + api.logger.error( + `memory-core: deferred dreaming cron retry config refresh failed: ${formatErrorMessage(configErr)}`, + ); + } + }); + }, STARTUP_CRON_RETRY_DELAY_MS); + }; + api.on("gateway_start", async (_event, ctx) => { + disposed = false; // Store the gateway context for runtime cron resolution retries. gatewayContext = ctx as unknown as { getCron?: () => CronServiceLike | null }; try { - await reconcileManagedDreamingCron({ + const config = await reconcileManagedDreamingCron({ reason: "startup", startupConfig: ctx.config, startupCron: () => resolveCronServiceFromGatewayContext(ctx), }); + scheduleStartupCronRetry(config); } catch (err) { api.logger.error( `memory-core: dreaming startup reconciliation failed: ${formatErrorMessage(err)}`, @@ -796,6 +875,10 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void } }); + api.on("gateway_stop", () => { + disposeStartupCronRetry(); + }); + api.on("before_agent_reply", async (event, ctx) => { try { if (ctx.trigger !== "heartbeat" && ctx.trigger !== "cron") { @@ -846,5 +929,7 @@ export const __testing = { DEFAULT_DREAMING_MIN_RECALL_COUNT: DEFAULT_MEMORY_DREAMING_MIN_RECALL_COUNT, DEFAULT_DREAMING_MIN_UNIQUE_QUERIES: DEFAULT_MEMORY_DREAMING_MIN_UNIQUE_QUERIES, DEFAULT_DREAMING_RECENCY_HALF_LIFE_DAYS: DEFAULT_MEMORY_DREAMING_RECENCY_HALF_LIFE_DAYS, + STARTUP_CRON_RETRY_DELAY_MS, + STARTUP_CRON_RETRY_MAX_ATTEMPTS, }, }; diff --git a/src/plugins/contracts/boundary-invariants.test.ts b/src/plugins/contracts/boundary-invariants.test.ts index 40c60ceaf7b..2cc2484ac1f 100644 --- a/src/plugins/contracts/boundary-invariants.test.ts +++ b/src/plugins/contracts/boundary-invariants.test.ts @@ -41,7 +41,7 @@ const BUNDLED_TYPED_HOOK_REGISTRATION_GUARDS = { "subagent_ended", "subagent_spawning", ], - "extensions/memory-core/src/dreaming.ts": ["before_agent_reply", "gateway_start"], + "extensions/memory-core/src/dreaming.ts": ["before_agent_reply", "gateway_start", "gateway_stop"], "extensions/memory-lancedb/index.ts": ["agent_end", "before_prompt_build", "session_end"], "extensions/skill-workshop/index.ts": ["agent_end", "before_prompt_build"], "extensions/thread-ownership/index.ts": ["message_received", "message_sending"],