From b4e9f1bd1c9508c2d36c2810ec2537b672845cf6 Mon Sep 17 00:00:00 2001 From: Ke Wang <30745273+KeWang0622@users.noreply.github.com> Date: Tue, 28 Apr 2026 00:42:07 -0500 Subject: [PATCH] fix(memory-core): cap detached dream narratives (#73287) Cap detached Dream Diary narrative subagent runs across cron dreaming sweeps so multi-workspace runs cannot fan out unbounded subagent sessions. Adds regression coverage that queued detached narratives resume and clean up, plus a unit-fast lane correction for the security symlink audit test. --- CHANGELOG.md | 1 + .../src/dreaming-narrative.test.ts | 115 ++++++++++++++++++ .../memory-core/src/dreaming-narrative.ts | 50 ++++++++ extensions/memory-core/src/dreaming-phases.ts | 42 +++---- extensions/memory-core/src/dreaming.test.ts | 12 +- extensions/memory-core/src/dreaming.ts | 24 ++-- test/vitest/vitest.unit-fast-paths.mjs | 1 - 7 files changed, 208 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0267d3d7170..512e59c45ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ Docs: https://docs.openclaw.ai - Gateway/Docker: keep config-triggered restarts in-process inside containers instead of spawning a detached child and exiting PID 1 cleanly, so Docker Swarm and other on-failure supervisors do not leave the service stuck at 0/1 replicas. Fixes #73178. Thanks @du-nguyen-IT007. - CLI/tasks: ship the task-registry control runtime in npm packages so `openclaw tasks cancel` can load ACP/subagent cancellation helpers from published builds. Fixes #68997. Thanks @1OAKDesign. - Channels/Telegram: preserve unsent generated media after partial reply streaming has already delivered the text, so `image_generate` outputs still reach Telegram as photos instead of being dropped from the final payload. Fixes #73253. Thanks @mlaihk. +- Memory-core/dreaming: cap detached Dream Diary narrative subagents across cron sweeps so multi-workspace dreaming no longer fans out unbounded subagent sessions, lock contention, and cascading narrative timeouts. Fixes #73198. (#73287) Thanks @KeWang0622. - Export/session: keep inline export HTML scripts and vendor libraries injected after template formatting so generated session exports open with the app code, markdown renderer, and syntax highlighter present. Fixes #41862 and #49957; carries forward #41861 and #68947. Thanks @briannewman, @martenzi, and @armanddp. - Agents/ACPX: stage the patched Claude ACP adapter as an ACPX runtime dependency and route known Codex/Claude ACP commands through local wrappers, so Gateway runtime no longer depends on live `npx` adapter resolution. Fixes #73202. Thanks @joerod26. - Memory/compaction: let pre-compaction memory flush use an exact `agents.defaults.compaction.memoryFlush.model` override such as `ollama/qwen3:8b` without inheriting the active session fallback chain, so local housekeeping can avoid paid conversation models. Fixes #53772. Thanks @limen96. diff --git a/extensions/memory-core/src/dreaming-narrative.test.ts b/extensions/memory-core/src/dreaming-narrative.test.ts index 82f2404dd51..4ea7b739659 100644 --- a/extensions/memory-core/src/dreaming-narrative.test.ts +++ b/extensions/memory-core/src/dreaming-narrative.test.ts @@ -21,6 +21,7 @@ import { formatBackfillDiaryDate, generateAndAppendDreamNarrative, removeBackfillDiaryEntries, + runDetachedDreamNarrative, type NarrativePhaseData, writeBackfillDiaryEntries, } from "./dreaming-narrative.js"; @@ -1003,3 +1004,117 @@ describe("generateAndAppendDreamNarrative", () => { expect(subagent.deleteSession.mock.calls[1]?.[0]?.sessionKey).toBe(secondSessionKey); }); }); + +describe("runDetachedDreamNarrative", () => { + type Deferred = { promise: Promise; resolve: (v: T) => void }; + function deferred(): Deferred { + let resolve!: (v: T) => void; + const promise = new Promise((r) => { + resolve = r; + }); + return { promise, resolve }; + } + + function createBlockingSubagent() { + const runDeferreds: Array> = []; + const subagent = { + run: vi.fn(() => { + const d = deferred<{ runId: string }>(); + runDeferreds.push(d); + return d.promise; + }), + // Resolve the rest of the pipeline as a no-op so a single resolve() + // on a deferred unblocks the slot for the queued task. + waitForRun: vi.fn().mockResolvedValue({ status: "timeout" }), + getSessionMessages: vi.fn().mockResolvedValue({ messages: [] }), + deleteSession: vi.fn().mockResolvedValue(undefined), + }; + return { subagent, runDeferreds }; + } + + function createMockLogger() { + return { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + } + + async function drainMicrotasks(rounds = 30): Promise { + for (let i = 0; i < rounds; i += 1) { + await Promise.resolve(); + } + } + + it("caps the number of in-flight detached narratives at 3", async () => { + const { subagent, runDeferreds } = createBlockingSubagent(); + const workspaceDir = await createTempWorkspace("openclaw-dreaming-detach-"); + const logger = createMockLogger(); + + for (let i = 0; i < 5; i += 1) { + runDetachedDreamNarrative({ + subagent, + workspaceDir, + data: { phase: "light", snippets: [`fragment-${i}`] }, + nowMs: Date.parse("2026-04-28T03:00:00Z"), + logger, + }); + } + + await drainMicrotasks(); + + // Only the first 3 should have reached subagent.run; the rest are queued. + expect(subagent.run).toHaveBeenCalledTimes(3); + + // Drain the rest so module-level concurrency state does not leak into + // subsequent tests. The mock subagent creates a new deferred every time + // queued tasks acquire a slot, so loop until no new deferreds appear. + for (let iter = 0; iter < 10; iter += 1) { + const before = runDeferreds.length; + for (const d of runDeferreds) { + d.resolve({ runId: "drain" }); + } + if (before >= 5) { + break; + } + await vi.waitFor(() => { + expect(runDeferreds.length).toBeGreaterThan(before); + }); + } + for (const d of runDeferreds) { + d.resolve({ runId: "drain" }); + } + await vi.waitFor(() => { + expect(subagent.deleteSession).toHaveBeenCalledTimes(5); + }); + expect(subagent.run).toHaveBeenCalledTimes(5); + expect(subagent.waitForRun).toHaveBeenCalledTimes(5); + }); + + it("swallows underlying narrative errors instead of leaving an unhandled rejection", async () => { + const error = new Error("boom"); + const subagent = { + run: vi.fn().mockRejectedValue(error), + waitForRun: vi.fn().mockResolvedValue({ status: "ok" }), + getSessionMessages: vi.fn().mockResolvedValue({ messages: [] }), + deleteSession: vi.fn().mockResolvedValue(undefined), + }; + const logger = createMockLogger(); + const workspaceDir = await createTempWorkspace("openclaw-dreaming-detach-"); + const unhandled = vi.fn(); + process.on("unhandledRejection", unhandled); + + try { + runDetachedDreamNarrative({ + subagent, + workspaceDir, + data: { phase: "light", snippets: ["fragment"] }, + nowMs: Date.parse("2026-04-28T03:00:00Z"), + logger, + }); + + await drainMicrotasks(); + + expect(subagent.run).toHaveBeenCalledOnce(); + expect(unhandled).not.toHaveBeenCalled(); + } finally { + process.off("unhandledRejection", unhandled); + } + }); +}); diff --git a/extensions/memory-core/src/dreaming-narrative.ts b/extensions/memory-core/src/dreaming-narrative.ts index 29fd53b7e5d..04ca492d66d 100644 --- a/extensions/memory-core/src/dreaming-narrative.ts +++ b/extensions/memory-core/src/dreaming-narrative.ts @@ -1045,3 +1045,53 @@ export async function generateAndAppendDreamNarrative(params: { }); } } + +// ── Detached narrative concurrency limit ─────────────────────────────── +// +// Cron-driven dreaming detaches narrative generation across light, REM, and +// deep phases for every workspace, so a 10-workspace cron sweep used to fire +// 30 concurrent narrative subagents at once. Each one holds the session +// write-lock while it runs and burns a model slot, which caused lock +// contention (>30 s) and cascading narrative timeouts (#73198). +// +// `runDetachedDreamNarrative` wraps `generateAndAppendDreamNarrative` with a +// FIFO queue capped at `DETACHED_NARRATIVE_CONCURRENCY` so the total in-flight +// detached narratives across phases/workspaces stays bounded. +const DETACHED_NARRATIVE_CONCURRENCY = 3; + +let activeDetachedNarratives = 0; +const detachedNarrativeQueue: Array<() => void> = []; + +function releaseDetachedNarrativeSlot(): void { + activeDetachedNarratives -= 1; + detachedNarrativeQueue.shift()?.(); +} + +async function acquireDetachedNarrativeSlot(): Promise { + if (activeDetachedNarratives >= DETACHED_NARRATIVE_CONCURRENCY) { + await new Promise((resolve) => { + detachedNarrativeQueue.push(resolve); + }); + } + activeDetachedNarratives += 1; +} + +export function runDetachedDreamNarrative( + params: Parameters[0], +): void { + queueMicrotask(() => { + void (async () => { + await acquireDetachedNarrativeSlot(); + try { + await generateAndAppendDreamNarrative(params); + } catch { + // Detached narratives intentionally swallow errors — callers (cron + // sweeps) cannot recover, and surfacing here would only cause noisy + // unhandled rejections. Logging happens inside + // generateAndAppendDreamNarrative. + } finally { + releaseDetachedNarrativeSlot(); + } + })(); + }); +} diff --git a/extensions/memory-core/src/dreaming-phases.ts b/extensions/memory-core/src/dreaming-phases.ts index 84cb6bee262..948904efa72 100644 --- a/extensions/memory-core/src/dreaming-phases.ts +++ b/extensions/memory-core/src/dreaming-phases.ts @@ -19,7 +19,11 @@ import { } from "openclaw/plugin-sdk/memory-core-host-status"; import type { OpenClawPluginApi } from "openclaw/plugin-sdk/plugin-entry"; import { writeDailyDreamingPhaseBlock } from "./dreaming-markdown.js"; -import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js"; +import { + generateAndAppendDreamNarrative, + type NarrativePhaseData, + runDetachedDreamNarrative, +} from "./dreaming-narrative.js"; import { asRecord, formatErrorMessage, normalizeTrimmedString } from "./dreaming-shared.js"; import { filterLiveShortTermRecallEntries, @@ -1574,16 +1578,14 @@ async function runLightDreaming(params: { ...(themes.length > 0 ? { themes } : {}), }; if (params.detachNarratives) { - queueMicrotask(() => { - void generateAndAppendDreamNarrative({ - subagent: params.subagent!, - workspaceDir: params.workspaceDir, - data, - nowMs, - timezone: params.config.timezone, - model: params.config.execution?.model, - logger: params.logger, - }).catch(() => undefined); + runDetachedDreamNarrative({ + subagent: params.subagent, + workspaceDir: params.workspaceDir, + data, + nowMs, + timezone: params.config.timezone, + model: params.config.execution?.model, + logger: params.logger, }); } else { await generateAndAppendDreamNarrative({ @@ -1672,16 +1674,14 @@ async function runRemDreaming(params: { ...(themes.length > 0 ? { themes } : {}), }; if (params.detachNarratives) { - queueMicrotask(() => { - void generateAndAppendDreamNarrative({ - subagent: params.subagent!, - workspaceDir: params.workspaceDir, - data, - nowMs, - timezone: params.config.timezone, - model: params.config.execution?.model, - logger: params.logger, - }).catch(() => undefined); + runDetachedDreamNarrative({ + subagent: params.subagent, + workspaceDir: params.workspaceDir, + data, + nowMs, + timezone: params.config.timezone, + model: params.config.execution?.model, + logger: params.logger, }); } else { await generateAndAppendDreamNarrative({ diff --git a/extensions/memory-core/src/dreaming.test.ts b/extensions/memory-core/src/dreaming.test.ts index d381e8722ef..5dc96c444d3 100644 --- a/extensions/memory-core/src/dreaming.test.ts +++ b/extensions/memory-core/src/dreaming.test.ts @@ -1917,19 +1917,23 @@ describe("short-term dreaming trigger", () => { }); expect(result?.handled).toBe(true); - expect(subagent.run).toHaveBeenCalled(); - expect(subagent.run.mock.calls[0]?.[0]).toMatchObject({ - model: "anthropic/claude-sonnet-4-6", - }); const memoryText = await fs.readFile(path.join(workspaceDir, "MEMORY.md"), "utf-8"); expect(memoryText).toContain("Move backups to S3 Glacier."); + // Detached cron narratives now go through a bounded queue + // (see runDetachedDreamNarrative), so subagent.run lands a few extra + // microtasks after promotion returns. Wait for the full delivery chain + // rather than asserting on the exact tick order. await vi.waitFor(async () => { + expect(subagent.run).toHaveBeenCalled(); expect(subagent.waitForRun).toHaveBeenCalled(); expect(subagent.getSessionMessages).toHaveBeenCalled(); expect(subagent.deleteSession).toHaveBeenCalled(); const dreamsText = await fs.readFile(path.join(workspaceDir, "DREAMS.md"), "utf-8"); expect(dreamsText).toContain("A diary entry."); }); + expect(subagent.run.mock.calls[0]?.[0]).toMatchObject({ + model: "anthropic/claude-sonnet-4-6", + }); }); it("skips dreaming promotion cleanly when limit is zero", async () => { diff --git a/extensions/memory-core/src/dreaming.ts b/extensions/memory-core/src/dreaming.ts index 3023b2e2533..9686c1338dd 100644 --- a/extensions/memory-core/src/dreaming.ts +++ b/extensions/memory-core/src/dreaming.ts @@ -23,7 +23,11 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk/plugin-entry"; import { peekSystemEventEntries } from "openclaw/plugin-sdk/system-event-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { writeDeepDreamingReport } from "./dreaming-markdown.js"; -import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js"; +import { + generateAndAppendDreamNarrative, + type NarrativePhaseData, + runDetachedDreamNarrative, +} from "./dreaming-narrative.js"; import { runDreamingSweepPhases } from "./dreaming-phases.js"; import { formatErrorMessage, @@ -631,16 +635,14 @@ export async function runShortTermDreamingPromotionIfTriggered(params: { promotions: applied.appliedCandidates.map((c) => c.snippet).filter(Boolean), }; if (detachNarratives) { - queueMicrotask(() => { - void generateAndAppendDreamNarrative({ - subagent: params.subagent!, - workspaceDir, - data, - nowMs: sweepNowMs, - timezone: params.config.timezone, - model: params.config.execution?.model, - logger: params.logger, - }).catch(() => undefined); + runDetachedDreamNarrative({ + subagent: params.subagent, + workspaceDir, + data, + nowMs: sweepNowMs, + timezone: params.config.timezone, + model: params.config.execution?.model, + logger: params.logger, }); } else { await generateAndAppendDreamNarrative({ diff --git a/test/vitest/vitest.unit-fast-paths.mjs b/test/vitest/vitest.unit-fast-paths.mjs index c11be262245..1c5d77359e4 100644 --- a/test/vitest/vitest.unit-fast-paths.mjs +++ b/test/vitest/vitest.unit-fast-paths.mjs @@ -121,7 +121,6 @@ export const forcedUnitFastTestFiles = [ "src/realtime-voice/session-runtime.test.ts", "src/security/audit-channel-dm-policy.test.ts", "src/security/audit-channel-readonly-resolution.test.ts", - "src/security/audit-config-symlink.test.ts", "src/security/audit-exec-surface.test.ts", "src/security/audit-exec-safe-bins.test.ts", "src/security/audit-extra.sync.test.ts",