From 2055e75f9ff64b4cc78ee374067f9b5c2d40e9bb Mon Sep 17 00:00:00 2001 From: chiyouYCH <563318445@qq.com> Date: Tue, 21 Apr 2026 06:41:11 +0800 Subject: [PATCH] fix(memory-core): prevent dreaming-narrative session leaks (#66358) (#67023) Merged via squash. Prepared head SHA: 51f72b200c3a4f96a5dfc6adac3dc37f23dde53b Co-authored-by: chiyouYCH <26790612+chiyouYCH@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + .../memory-core/src/dreaming-narrative.ts | 22 +++-- .../memory-core/src/dreaming-phases.test.ts | 68 ++++++++++++- extensions/memory-core/src/dreaming-phases.ts | 98 +++++++++++++------ 4 files changed, 149 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9bfbb23ade..b373b55305e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Agents/gateway tool: extend the agent-facing `gateway` tool's config mutation guard so model-driven `config.patch` and `config.apply` cannot rewrite operator-trusted paths (sandbox, plugin trust, gateway auth/TLS, hook routing and tokens, SSRF policy, MCP servers, workspace filesystem hardening) and cannot bypass the guard by editing per-agent sandbox, tools, or embedded-Pi overrides in place under `agents.list[]`. (#69377) Thanks @eleqtrizit. - Gateway/websocket broadcasts: require `operator.read` (or higher) for chat, agent, and tool-result event frames so pairing-scoped and node-role sessions no longer passively receive session chat content, and scope-gate unknown broadcast events by default. Plugin-defined `plugin.*` broadcasts are scoped to operator.write/admin, and status/transport events (`heartbeat`, `presence`, `tick`, etc.) remain unrestricted. Per-client sequence numbers preserve per-connection monotonicity. (#69373) Thanks @eleqtrizit. - Agents/compaction: always reload embedded Pi resources through an explicit loader and reapply reserve-token overrides so runs without extension factories no longer silently lose compaction settings before session start. (#67146) Thanks @ly85206559. +- Memory-core/dreaming: normalize sweep timestamps and reuse hashed narrative session keys for fallback cleanup so Dreaming narrative sub-sessions stop leaking. (#67023) Thanks @chiyouYCH. ## 2026.4.20 ### Changes diff --git a/extensions/memory-core/src/dreaming-narrative.ts b/extensions/memory-core/src/dreaming-narrative.ts index 2c0381b27fb..c8470b23f7f 100644 --- a/extensions/memory-core/src/dreaming-narrative.ts +++ b/extensions/memory-core/src/dreaming-narrative.ts @@ -177,7 +177,10 @@ async function startNarrativeRunOrFallback(params: { } } -function buildNarrativeSessionKey(params: { +/** + * Build the deterministic subagent session key used for dream narratives. + */ +export function buildNarrativeSessionKey(params: { workspaceDir: string; phase: NarrativePhaseData["phase"]; nowMs: number; @@ -911,7 +914,7 @@ export async function generateAndAppendDreamNarrative(params: { `memory-core: narrative generation failed for ${params.data.phase} phase: ${formatErrorMessage(err)}`, ); } finally { - if (runId && waitStatus === "timeout") { + if (params.subagent && runId && waitStatus === "timeout") { try { const settle = await params.subagent.waitForRun({ runId, @@ -929,12 +932,15 @@ export async function generateAndAppendDreamNarrative(params: { } } - try { - await params.subagent.deleteSession({ sessionKey }); - } catch (cleanupErr) { - params.logger.warn( - `memory-core: narrative session cleanup failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupErr)}`, - ); + // Guard against subagent becoming unavailable mid-flight (throws TypeError without this). + if (params.subagent) { + try { + await params.subagent.deleteSession({ sessionKey }); + } catch (cleanupErr) { + params.logger.warn( + `memory-core: narrative session cleanup failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupErr)}`, + ); + } } await scrubDreamingNarrativeArtifacts(params.logger).catch((scrubErr: unknown) => { diff --git a/extensions/memory-core/src/dreaming-phases.test.ts b/extensions/memory-core/src/dreaming-phases.test.ts index 8b171c98348..0d9e6f75528 100644 --- a/extensions/memory-core/src/dreaming-phases.test.ts +++ b/extensions/memory-core/src/dreaming-phases.test.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core"; @@ -8,7 +9,7 @@ import { resolveMemoryRemDreamingConfig, } from "openclaw/plugin-sdk/memory-core-host-status"; import { describe, expect, it, vi } from "vitest"; -import { __testing } from "./dreaming-phases.js"; +import { __testing, runDreamingSweepPhases } from "./dreaming-phases.js"; import { rankShortTermPromotionCandidates, recordShortTermRecalls, @@ -187,6 +188,71 @@ async function readCandidateSnippets(workspaceDir: string, nowIso: string): Prom } describe("memory-core dreaming phases", () => { + it("uses the hashed narrative session key for sweep-level fallback cleanup", async () => { + const workspaceDir = await createDreamingWorkspace(); + await writeDailyNote(workspaceDir, [ + `# ${DREAMING_TEST_DAY}`, + "", + "- Move backups to S3 Glacier.", + "- Keep retention at 365 days.", + ]); + const testConfig: OpenClawConfig = { + ...LIGHT_DREAMING_TEST_CONFIG, + agents: { + defaults: { + workspace: workspaceDir, + userTimezone: "UTC", + }, + }, + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + timezone: "UTC", + phases: { + light: { + enabled: true, + limit: 20, + lookbackDays: 2, + }, + rem: { + enabled: false, + limit: 0, + lookbackDays: 2, + }, + }, + }, + }, + }, + }, + }, + }; + const subagent = createMockNarrativeSubagent("The archive hummed softly."); + const logger = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const nowMs = Date.parse("2026-04-05T10:05:00.000Z"); + const workspaceHash = createHash("sha1").update(workspaceDir).digest("hex").slice(0, 12); + const expectedSessionKey = `dreaming-narrative-light-${workspaceHash}-${nowMs}`; + + await runDreamingSweepPhases({ + workspaceDir, + cfg: testConfig, + pluginConfig: resolveMemoryCorePluginConfig(testConfig), + logger, + subagent, + nowMs, + }); + + expect(subagent.deleteSession).toHaveBeenCalledTimes(2); + expect(subagent.deleteSession).toHaveBeenNthCalledWith(1, { sessionKey: expectedSessionKey }); + expect(subagent.deleteSession).toHaveBeenNthCalledWith(2, { sessionKey: expectedSessionKey }); + }); + it("does not re-ingest managed light dreaming blocks from daily notes", async () => { const workspaceDir = await createDreamingWorkspace(); await withDreamingTestClock(async () => { diff --git a/extensions/memory-core/src/dreaming-phases.ts b/extensions/memory-core/src/dreaming-phases.ts index 993129c7a70..bc0bbde1878 100644 --- a/extensions/memory-core/src/dreaming-phases.ts +++ b/extensions/memory-core/src/dreaming-phases.ts @@ -2,7 +2,7 @@ import { createHash } from "node:crypto"; import type { Dirent } from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; -import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core"; +import type { OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core"; import { buildSessionEntry, listSessionFilesForAgent, @@ -17,11 +17,13 @@ import { resolveMemoryDreamingWorkspaces, resolveMemoryLightDreamingConfig, resolveMemoryRemDreamingConfig, - type MemoryLightDreamingConfig, - type MemoryRemDreamingConfig, } from "openclaw/plugin-sdk/memory-core-host-status"; import { writeDailyDreamingPhaseBlock } from "./dreaming-markdown.js"; -import { generateAndAppendDreamNarrative, type NarrativePhaseData } from "./dreaming-narrative.js"; +import { + buildNarrativeSessionKey, + generateAndAppendDreamNarrative, + type NarrativePhaseData, +} from "./dreaming-narrative.js"; import { asRecord, formatErrorMessage, normalizeTrimmedString } from "./dreaming-shared.js"; import { readShortTermRecallEntries, @@ -31,26 +33,39 @@ import { } from "./short-term-promotion.js"; type Logger = Pick; +type DreamingHostConfig = unknown; type DreamingPhaseStorageConfig = { timezone?: string; storage: { mode: "inline" | "separate" | "both"; separateReports: boolean }; }; +type LightDreamingConfig = DreamingPhaseStorageConfig & { + enabled: boolean; + lookbackDays: number; + limit: number; + dedupeSimilarity: number; +}; +type RemDreamingConfig = DreamingPhaseStorageConfig & { + enabled: boolean; + lookbackDays: number; + limit: number; + minPatternStrength: number; +}; type RunPhaseIfTriggeredParams = { cleanedBody: string; trigger?: string; workspaceDir?: string; - cfg?: OpenClawConfig; + cfg?: DreamingHostConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; eventText: string; } & ( | { phase: "light"; - config: MemoryLightDreamingConfig & DreamingPhaseStorageConfig; + config: LightDreamingConfig; } | { phase: "rem"; - config: MemoryRemDreamingConfig & DreamingPhaseStorageConfig; + config: RemDreamingConfig; } ); const LIGHT_SLEEP_EVENT_TEXT = "__openclaw_memory_core_light_sleep__"; @@ -91,11 +106,13 @@ const MANAGED_DAILY_DREAMING_BLOCKS = [ ] as const; function resolveWorkspaces(params: { - cfg?: OpenClawConfig; + cfg?: DreamingHostConfig; fallbackWorkspaceDir?: string; }): string[] { const workspaceCandidates = params.cfg - ? resolveMemoryDreamingWorkspaces(params.cfg).map((entry) => entry.workspaceDir) + ? resolveMemoryDreamingWorkspaces( + params.cfg as Parameters[0], + ).map((entry) => entry.workspaceDir) : []; const seen = new Set(); const workspaces = workspaceCandidates.filter((workspaceDir) => { @@ -603,15 +620,14 @@ function buildSessionRenderedLine(params: { return `[${source}] ${params.snippet}`.slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS + 64); } -function resolveSessionAgentsForWorkspace( - cfg: OpenClawConfig | undefined, - workspaceDir: string, -): string[] { +function resolveSessionAgentsForWorkspace(cfg: DreamingHostConfig, workspaceDir: string): string[] { if (!cfg) { return []; } const target = normalizeWorkspaceKey(workspaceDir); - const workspaces = resolveMemoryDreamingWorkspaces(cfg); + const workspaces = resolveMemoryDreamingWorkspaces( + cfg as Parameters[0], + ); const match = workspaces.find((entry) => normalizeWorkspaceKey(entry.workspaceDir) === target); if (!match) { return []; @@ -668,7 +684,7 @@ async function appendSessionCorpusLines(params: { async function collectSessionIngestionBatches(params: { workspaceDir: string; - cfg?: OpenClawConfig; + cfg?: DreamingHostConfig; lookbackDays: number; nowMs: number; timezone?: string; @@ -956,7 +972,7 @@ async function collectSessionIngestionBatches(params: { async function ingestSessionTranscriptSignals(params: { workspaceDir: string; - cfg?: OpenClawConfig; + cfg?: DreamingHostConfig; lookbackDays: number; nowMs: number; timezone?: string; @@ -1473,11 +1489,8 @@ export function previewRemDreaming(params: { async function runLightDreaming(params: { workspaceDir: string; - cfg?: OpenClawConfig; - config: MemoryLightDreamingConfig & { - timezone?: string; - storage: { mode: "inline" | "separate" | "both"; separateReports: boolean }; - }; + cfg?: DreamingHostConfig; + config: LightDreamingConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; nowMs?: number; @@ -1553,11 +1566,8 @@ async function runLightDreaming(params: { async function runRemDreaming(params: { workspaceDir: string; - cfg?: OpenClawConfig; - config: MemoryRemDreamingConfig & { - timezone?: string; - storage: { mode: "inline" | "separate" | "both"; separateReports: boolean }; - }; + cfg?: DreamingHostConfig; + config: RemDreamingConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; nowMs?: number; @@ -1636,14 +1646,17 @@ async function runRemDreaming(params: { export async function runDreamingSweepPhases(params: { workspaceDir: string; pluginConfig?: Record; - cfg?: OpenClawConfig; + cfg?: DreamingHostConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; nowMs?: number; }): Promise { + // Normalize nowMs once so all phase timestamps and narrative session keys are consistent. + const sweepNowMs: number = Number.isFinite(params.nowMs) ? (params.nowMs as number) : Date.now(); + const light = resolveMemoryLightDreamingConfig({ pluginConfig: params.pluginConfig, - cfg: params.cfg, + cfg: params.cfg as Parameters[0]["cfg"], }); if (light.enabled && light.limit > 0) { await runLightDreaming({ @@ -1652,13 +1665,25 @@ export async function runDreamingSweepPhases(params: { config: light, logger: params.logger, subagent: params.subagent, - nowMs: params.nowMs, + nowMs: sweepNowMs, }); + // Defensive cleanup: ensure the light-phase narrative session is deleted even if + // generateAndAppendDreamNarrative's primary cleanup was skipped due to an error. + if (params.subagent) { + const lightSessionKey = buildNarrativeSessionKey({ + workspaceDir: params.workspaceDir, + phase: "light", + nowMs: sweepNowMs, + }); + await params.subagent.deleteSession({ sessionKey: lightSessionKey }).catch(() => { + // Swallow errors — this is best-effort cleanup. + }); + } } const rem = resolveMemoryRemDreamingConfig({ pluginConfig: params.pluginConfig, - cfg: params.cfg, + cfg: params.cfg as Parameters[0]["cfg"], }); if (rem.enabled && rem.limit > 0) { await runRemDreaming({ @@ -1667,8 +1692,19 @@ export async function runDreamingSweepPhases(params: { config: rem, logger: params.logger, subagent: params.subagent, - nowMs: params.nowMs, + nowMs: sweepNowMs, }); + // Defensive cleanup: ensure the REM-phase narrative session is deleted. + if (params.subagent) { + const remSessionKey = buildNarrativeSessionKey({ + workspaceDir: params.workspaceDir, + phase: "rem", + nowMs: sweepNowMs, + }); + await params.subagent.deleteSession({ sessionKey: remSessionKey }).catch(() => { + // Swallow errors — this is best-effort cleanup. + }); + } } }