fix(memory-core): retry dreaming cron startup reconciliation (#73493)

Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com>
This commit is contained in:
Alex Knight
2026-04-28 21:15:23 +10:00
committed by GitHub
parent 2ccdbc7dd9
commit e84ebeafbd
4 changed files with 304 additions and 3 deletions

View File

@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Plugins/media: auto-enable provider plugins referenced by `agents.defaults.imageGenerationModel`, `videoGenerationModel`, and `musicGenerationModel` primary/fallback refs, so configured Google and MiniMax media providers do not stay disabled behind a restrictive plugin allowlist. Thanks @vincentkoc.
- Memory-core/dreaming: retry managed dreaming cron registration after startup when the cron service is not reachable yet, so the scheduled Memory Dreaming Promotion sweep recovers without waiting for heartbeat traffic. Fixes #72841. Thanks @amknight.
## 2026.4.27

View File

@@ -59,7 +59,11 @@ async function writeDailyMemoryNote(
function createCronHarness(
initialJobs: CronJobLike[] = [],
opts?: { removeResult?: "boolean" | "unknown"; removeThrowsForIds?: string[] },
opts?: {
listThrowsForFirstCalls?: number;
removeResult?: "boolean" | "unknown";
removeThrowsForIds?: string[];
},
) {
const jobs: CronJobLike[] = [...initialJobs];
let listCalls = 0;
@@ -70,6 +74,9 @@ function createCronHarness(
const cron: CronParam = {
async list() {
listCalls += 1;
if (opts?.listThrowsForFirstCalls && listCalls <= opts.listThrowsForFirstCalls) {
throw new Error(`list failed on call ${listCalls}`);
}
return jobs.map((job) => ({
...job,
...(job.schedule ? { schedule: { ...job.schedule } } : {}),
@@ -173,6 +180,22 @@ function getGatewayStartHandler(
) => Promise<unknown>;
}
function getGatewayStopHandler(
onMock: ReturnType<typeof vi.fn>,
): (
event: { reason?: string },
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown },
) => Promise<unknown> | void {
const call = onMock.mock.calls.find(([eventName]) => eventName === "gateway_stop");
if (!call) {
throw new Error("gateway_stop hook was not registered");
}
return call[1] as (
event: { reason?: string },
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown },
) => Promise<unknown> | void;
}
async function triggerGatewayStart(
onMock: ReturnType<typeof vi.fn>,
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown },
@@ -180,6 +203,13 @@ async function triggerGatewayStart(
await getGatewayStartHandler(onMock)({ port: 18789 }, ctx);
}
async function triggerGatewayStop(
onMock: ReturnType<typeof vi.fn>,
ctx: { config?: OpenClawConfig; workspaceDir?: string; getCron?: () => unknown } = {},
): Promise<void> {
await getGatewayStopHandler(onMock)({ reason: "test" }, ctx);
}
function registerShortTermPromotionDreamingForTest(api: DreamingPluginApiTestDouble): void {
registerShortTermPromotionDreaming(api as unknown as DreamingPluginApi);
}
@@ -1332,6 +1362,191 @@ describe("gateway startup reconciliation", () => {
}
});
it("retries startup cron reconciliation until cron is available without a heartbeat (regression #72841)", async () => {
vi.useFakeTimers();
clearInternalHooks();
const logger = createLogger();
const harness = createCronHarness();
const onMock = vi.fn();
const api: DreamingPluginApiTestDouble = {
config: {
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
},
},
},
},
},
pluginConfig: {},
logger,
runtime: {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
let cronAvailable = false;
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => (cronAvailable ? harness.cron : undefined),
});
expect(harness.addCalls).toHaveLength(0);
expect(logger.debug).toHaveBeenCalledWith(
expect.stringContaining("cron service not yet available at gateway_start"),
);
await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS);
expect(harness.addCalls).toHaveLength(0);
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining("cron service unavailable"));
cronAvailable = true;
await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS);
expect(harness.addCalls).toHaveLength(1);
expect(harness.addCalls[0]).toMatchObject({
name: "Memory Dreaming Promotion",
schedule: {
kind: "cron",
expr: "15 4 * * *",
tz: "UTC",
},
sessionTarget: "isolated",
payload: {
kind: "agentTurn",
message: constants.DREAMING_SYSTEM_EVENT_TEXT,
lightContext: true,
},
});
} finally {
vi.useRealTimers();
clearInternalHooks();
}
});
it("does not reschedule startup cron retry from stale enabled config after runtime config disables dreaming", async () => {
vi.useFakeTimers();
clearInternalHooks();
const logger = createLogger();
const harness = createCronHarness([], { listThrowsForFirstCalls: 1 });
const onMock = vi.fn();
const api: DreamingPluginApiTestDouble = {
config: {
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
},
},
},
},
},
pluginConfig: {},
logger,
runtime: {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
let cronAvailable = false;
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => (cronAvailable ? harness.cron : undefined),
});
api.config = {
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: false,
frequency: "15 4 * * *",
timezone: "UTC",
},
},
},
},
},
} as OpenClawConfig;
cronAvailable = true;
await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS);
await vi.advanceTimersByTimeAsync(constants.STARTUP_CRON_RETRY_DELAY_MS);
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("deferred dreaming cron retry failed"),
);
expect(harness.listCalls).toBe(1);
expect(harness.addCalls).toHaveLength(0);
} finally {
vi.useRealTimers();
clearInternalHooks();
}
});
it("clears pending startup cron retry on gateway stop", async () => {
vi.useFakeTimers();
clearInternalHooks();
const logger = createLogger();
const harness = createCronHarness();
const onMock = vi.fn();
const api: DreamingPluginApiTestDouble = {
config: {
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: true,
frequency: "15 4 * * *",
timezone: "UTC",
},
},
},
},
},
},
pluginConfig: {},
logger,
runtime: {},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
let cronAvailable = false;
await triggerGatewayStart(onMock, {
config: api.config,
getCron: () => (cronAvailable ? harness.cron : undefined),
});
await triggerGatewayStop(onMock);
cronAvailable = true;
await vi.advanceTimersByTimeAsync(
constants.STARTUP_CRON_RETRY_DELAY_MS * constants.STARTUP_CRON_RETRY_MAX_ATTEMPTS,
);
expect(harness.addCalls).toHaveLength(0);
} finally {
vi.useRealTimers();
clearInternalHooks();
}
});
it("uses live runtime config for heartbeat dreaming reconciliation", async () => {
clearInternalHooks();
const logger = createLogger();

View File

@@ -41,6 +41,8 @@ import {
} from "./short-term-promotion.js";
const RUNTIME_CRON_RECONCILE_INTERVAL_MS = 60_000;
const STARTUP_CRON_RETRY_DELAY_MS = 5_000;
const STARTUP_CRON_RETRY_MAX_ATTEMPTS = 12;
const HEARTBEAT_ISOLATED_SESSION_SUFFIX = ":heartbeat";
type Logger = Pick<OpenClawPluginApi["logger"], "info" | "warn" | "error">;
@@ -682,10 +684,44 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
let lastRuntimeReconcileAtMs = 0;
let lastRuntimeConfigKey: string | null = null;
let lastRuntimeCronRef: CronServiceLike | null = null;
let startupCronRetryTimer: ReturnType<typeof setTimeout> | null = null;
let startupCronRetryAttempts = 0;
let disposed = false;
const resolveCurrentConfig = (): OpenClawConfig =>
(api.runtime.config?.current?.() ?? api.config) as OpenClawConfig;
const resolveCurrentDreamingConfig = (): ShortTermPromotionDreamingConfig => {
const cfg = resolveCurrentConfig();
return resolveShortTermPromotionDreamingConfig({
pluginConfig: resolveMemoryCorePluginConfig(cfg),
cfg,
});
};
const clearStartupCronRetry = (): void => {
if (startupCronRetryTimer) {
clearTimeout(startupCronRetryTimer);
startupCronRetryTimer = null;
}
startupCronRetryAttempts = 0;
};
const hasStartupCron = (): boolean => {
try {
return Boolean(resolveStartupCron?.());
} catch {
return false;
}
};
const disposeStartupCronRetry = (): void => {
disposed = true;
clearStartupCronRetry();
gatewayContext = null;
resolveStartupCron = null;
};
const runtimeConfigKey = (config: ShortTermPromotionDreamingConfig): string =>
[
config.enabled ? "enabled" : "disabled",
@@ -756,6 +792,7 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
}
if (cron) {
unavailableCronWarningEmitted = false;
clearStartupCronRetry();
}
if (params.reason === "runtime") {
const now = Date.now();
@@ -780,15 +817,57 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
return config;
};
const scheduleStartupCronRetry = (config: ShortTermPromotionDreamingConfig): void => {
if (disposed || !config.enabled || hasStartupCron()) {
clearStartupCronRetry();
return;
}
if (startupCronRetryTimer || startupCronRetryAttempts >= STARTUP_CRON_RETRY_MAX_ATTEMPTS) {
return;
}
startupCronRetryTimer = setTimeout(() => {
startupCronRetryTimer = null;
if (disposed) {
return;
}
startupCronRetryAttempts += 1;
void reconcileManagedDreamingCron({ reason: "runtime" })
.then((latestConfig) => {
if (disposed || !latestConfig.enabled || hasStartupCron()) {
clearStartupCronRetry();
return;
}
scheduleStartupCronRetry(latestConfig);
})
.catch((err) => {
if (disposed) {
return;
}
api.logger.error(
`memory-core: deferred dreaming cron retry failed: ${formatErrorMessage(err)}`,
);
try {
scheduleStartupCronRetry(resolveCurrentDreamingConfig());
} catch (configErr) {
api.logger.error(
`memory-core: deferred dreaming cron retry config refresh failed: ${formatErrorMessage(configErr)}`,
);
}
});
}, STARTUP_CRON_RETRY_DELAY_MS);
};
api.on("gateway_start", async (_event, ctx) => {
disposed = false;
// Store the gateway context for runtime cron resolution retries.
gatewayContext = ctx as unknown as { getCron?: () => CronServiceLike | null };
try {
await reconcileManagedDreamingCron({
const config = await reconcileManagedDreamingCron({
reason: "startup",
startupConfig: ctx.config,
startupCron: () => resolveCronServiceFromGatewayContext(ctx),
});
scheduleStartupCronRetry(config);
} catch (err) {
api.logger.error(
`memory-core: dreaming startup reconciliation failed: ${formatErrorMessage(err)}`,
@@ -796,6 +875,10 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
}
});
api.on("gateway_stop", () => {
disposeStartupCronRetry();
});
api.on("before_agent_reply", async (event, ctx) => {
try {
if (ctx.trigger !== "heartbeat" && ctx.trigger !== "cron") {
@@ -846,5 +929,7 @@ export const __testing = {
DEFAULT_DREAMING_MIN_RECALL_COUNT: DEFAULT_MEMORY_DREAMING_MIN_RECALL_COUNT,
DEFAULT_DREAMING_MIN_UNIQUE_QUERIES: DEFAULT_MEMORY_DREAMING_MIN_UNIQUE_QUERIES,
DEFAULT_DREAMING_RECENCY_HALF_LIFE_DAYS: DEFAULT_MEMORY_DREAMING_RECENCY_HALF_LIFE_DAYS,
STARTUP_CRON_RETRY_DELAY_MS,
STARTUP_CRON_RETRY_MAX_ATTEMPTS,
},
};

View File

@@ -41,7 +41,7 @@ const BUNDLED_TYPED_HOOK_REGISTRATION_GUARDS = {
"subagent_ended",
"subagent_spawning",
],
"extensions/memory-core/src/dreaming.ts": ["before_agent_reply", "gateway_start"],
"extensions/memory-core/src/dreaming.ts": ["before_agent_reply", "gateway_start", "gateway_stop"],
"extensions/memory-lancedb/index.ts": ["agent_end", "before_prompt_build", "session_end"],
"extensions/skill-workshop/index.ts": ["agent_end", "before_prompt_build"],
"extensions/thread-ownership/index.ts": ["message_received", "message_sending"],