diff --git a/CHANGELOG.md b/CHANGELOG.md index 8709df1b256..e9c670b5c37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Infra/net: fix multipart FormData fields (including `model`) being silently dropped when a guarded runtime fetch body crosses a FormData implementation boundary, restoring OpenAI audio transcription requests that failed with HTTP 400. (#64349) Thanks @petr-sloup. - Dreaming/diary: use the host local timezone for diary timestamps when `dreaming.timezone` is unset, so `DREAMS.md` and the UI stop defaulting to UTC. (#65034) Thanks @neo1027144-creator and @vincentkoc. - Dreaming/diary: include the timezone abbreviation in diary timestamps so `DREAMS.md` and the UI make UTC or local host time explicit. (#65057) Thanks @Yanhu007 and @vincentkoc. +- Dreaming/narrative: harden transient narrative cleanup by retrying timed-out deletes and scrubbing stale dreaming session artifacts through the lock-aware session-store path. (#65320) Thanks @serkonyc and @vincentkoc. - Plugins/memory: restore cached memory capability public artifacts on plugin-registry cache hits so memory-backed artifact surfaces stay visible after warm loads. Thanks @sercada and @vincentkoc. - Gateway/cron: preserve requested isolated-agent config across runtime reloads so subagent jobs and heartbeat overrides keep the right workspace and heartbeat settings when the hot-loaded snapshot is stale. Thanks @l0cka and @vincentkoc. - Gateway/plugins: always send a non-empty `idempotencyKey` for plugin subagent runs, so dreaming narrative jobs stop failing gateway schema validation. (#65354) Thanks @CodeForgeNet and @vincentkoc. diff --git a/extensions/memory-core/src/dreaming-narrative.test.ts b/extensions/memory-core/src/dreaming-narrative.test.ts index f9627f11c4c..178f56e29ef 100644 --- a/extensions/memory-core/src/dreaming-narrative.test.ts +++ b/extensions/memory-core/src/dreaming-narrative.test.ts @@ -1,9 +1,11 @@ import fs from "node:fs/promises"; import path from "node:path"; +import * as configRuntimeModule from "openclaw/plugin-sdk/config-runtime"; import { RequestScopedSubagentRuntimeError, SUBAGENT_RUNTIME_REQUEST_SCOPE_ERROR_CODE, } from "openclaw/plugin-sdk/error-runtime"; +import * as memoryCoreHostRuntimeCoreModule from "openclaw/plugin-sdk/memory-core-host-runtime-core"; import { afterEach, describe, expect, it, vi } from "vitest"; import { resolveGlobalMap } from "../../../src/shared/global-singleton.js"; import { @@ -126,10 +128,7 @@ describe("formatNarrativeDate", () => { it("applies an explicit timezone", () => { // 2026-04-11T21:46:55Z in America/Los_Angeles (PDT, UTC-7) → 2:46 PM - const date = formatNarrativeDate( - Date.parse("2026-04-11T21:46:55Z"), - "America/Los_Angeles", - ); + const date = formatNarrativeDate(Date.parse("2026-04-11T21:46:55Z"), "America/Los_Angeles"); expect(date).toContain("2:46"); expect(date).toContain("PM"); expect(date).toContain("PDT"); @@ -654,6 +653,29 @@ describe("generateAndAppendDreamNarrative", () => { expect(exists).toBe(false); }); + it("waits once more before cleanup after timeout and logs cleanup failures", async () => { + const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-"); + const subagent = createMockSubagent(""); + subagent.waitForRun + .mockResolvedValueOnce({ status: "timeout" }) + .mockResolvedValueOnce({ status: "ok" }); + subagent.deleteSession.mockRejectedValue(new Error("still active")); + const logger = createMockLogger(); + + await generateAndAppendDreamNarrative({ + subagent, + workspaceDir, + data: { phase: "rem", snippets: ["some memory"] }, + logger, + }); + + expect(subagent.waitForRun).toHaveBeenCalledTimes(2); + expect(subagent.waitForRun.mock.calls[1][0]).toMatchObject({ timeoutMs: 120_000 }); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("narrative session cleanup failed for rem phase"), + ); + }); + it("handles subagent error gracefully", async () => { const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-"); const subagent = createMockSubagent(""); @@ -764,4 +786,66 @@ describe("generateAndAppendDreamNarrative", () => { expect(subagent.deleteSession).toHaveBeenCalled(); }); + + it("scrubs stale dreaming entries and orphan transcripts after cleanup", async () => { + const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-"); + const stateDir = await createTempWorkspace("openclaw-dreaming-state-"); + const sessionsDir = path.join(stateDir, "agents", "main", "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + const storePath = path.join(sessionsDir, "sessions.json"); + const orphanPath = path.join(sessionsDir, "orphan.jsonl"); + const livePath = path.join(sessionsDir, "still-live.jsonl"); + await fs.writeFile( + storePath, + `${JSON.stringify({ + "agent:main:dreaming-narrative-light-1": { + sessionId: "missing", + }, + "agent:main:kept-session": { + sessionId: "still-live", + }, + "agent:main:telegram:group:dreaming-narrative-room": { + sessionId: "still-missing-non-dreaming", + }, + })}\n`, + "utf-8", + ); + await fs.writeFile(orphanPath, '{"runId":"dreaming-narrative-light-123"}\n', "utf-8"); + await fs.writeFile(livePath, '{"runId":"dreaming-narrative-light-keep"}\n', "utf-8"); + const oldDate = new Date(Date.now() - 600_000); + await fs.utimes(orphanPath, oldDate, oldDate); + await fs.utimes(livePath, oldDate, oldDate); + + vi.spyOn(configRuntimeModule, "loadConfig").mockReturnValue({ session: {} } as never); + vi.spyOn(configRuntimeModule, "resolveStorePath").mockImplementation((( + _store: string | undefined, + { agentId }: { agentId: string }, + ) => { + expect(agentId).toBe("main"); + return storePath; + }) as typeof configRuntimeModule.resolveStorePath); + vi.spyOn(memoryCoreHostRuntimeCoreModule, "resolveStateDir").mockReturnValue(stateDir); + + const subagent = createMockSubagent("The repository whispered of forgotten endpoints."); + const logger = createMockLogger(); + + await generateAndAppendDreamNarrative({ + subagent, + workspaceDir, + data: { phase: "light", snippets: ["memory fragment"] }, + logger, + }); + + const updatedStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + unknown + >; + expect(updatedStore).not.toHaveProperty("agent:main:dreaming-narrative-light-1"); + expect(updatedStore).toHaveProperty("agent:main:kept-session"); + expect(updatedStore).toHaveProperty("agent:main:telegram:group:dreaming-narrative-room"); + const sessionFiles = await fs.readdir(sessionsDir); + expect(sessionFiles.some((name) => name.startsWith("orphan.jsonl.deleted."))).toBe(true); + expect(sessionFiles).toContain("still-live.jsonl"); + expect(logger.info).toHaveBeenCalledWith(expect.stringContaining("dreaming cleanup scrubbed")); + }); }); diff --git a/extensions/memory-core/src/dreaming-narrative.ts b/extensions/memory-core/src/dreaming-narrative.ts index 54032d1f410..6d579ddc822 100644 --- a/extensions/memory-core/src/dreaming-narrative.ts +++ b/extensions/memory-core/src/dreaming-narrative.ts @@ -1,5 +1,12 @@ +import type { Dirent } from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; +import { + loadConfig, + loadSessionStore, + resolveStorePath, + updateSessionStore, +} from "openclaw/plugin-sdk/config-runtime"; import { extractErrorCode, formatErrorMessage, @@ -9,6 +16,7 @@ import { } from "openclaw/plugin-sdk/error-runtime"; import { resolveGlobalMap } from "openclaw/plugin-sdk/global-singleton"; import { createAsyncLock } from "openclaw/plugin-sdk/infra-runtime"; +import { resolveStateDir } from "openclaw/plugin-sdk/memory-core-host-runtime-core"; // ── Types ────────────────────────────────────────────────────────────── @@ -76,6 +84,11 @@ const NARRATIVE_SYSTEM_PROMPT = [ ].join("\n"); const NARRATIVE_TIMEOUT_MS = 60_000; +const NARRATIVE_DELETE_SETTLE_TIMEOUT_MS = 120_000; +const DREAMING_SESSION_KEY_PREFIX = "dreaming-narrative-"; +const DREAMING_TRANSCRIPT_RUN_MARKER = '"runId":"dreaming-narrative-'; +const DREAMING_ORPHAN_MIN_AGE_MS = 300_000; +const SAFE_SESSION_ID_RE = /^[a-z0-9][a-z0-9._-]{0,127}$/i; const DREAMS_FILENAMES = ["DREAMS.md", "dreams.md"] as const; const DIARY_START_MARKER = ""; const DIARY_END_MARKER = ""; @@ -620,6 +633,194 @@ export async function appendNarrativeEntry(params: { // ── Orchestrator ─────────────────────────────────────────────────────── +async function safePathExists(pathname: string): Promise { + try { + await fs.stat(pathname); + return true; + } catch { + return false; + } +} + +function normalizeComparablePath(pathname: string): string { + return process.platform === "win32" ? pathname.toLowerCase() : pathname; +} + +async function normalizeSessionFileForComparison(params: { + sessionsDir: string; + sessionFile: string; +}): Promise { + const trimmed = params.sessionFile.trim(); + if (!trimmed) { + return null; + } + const resolved = path.isAbsolute(trimmed) ? trimmed : path.resolve(params.sessionsDir, trimmed); + try { + return normalizeComparablePath(await fs.realpath(resolved)); + } catch { + return normalizeComparablePath(path.resolve(resolved)); + } +} + +function isDreamingSessionStoreKey(sessionKey: string): boolean { + const firstSeparator = sessionKey.indexOf(":"); + if (firstSeparator < 0) { + return sessionKey.startsWith(DREAMING_SESSION_KEY_PREFIX); + } + const secondSeparator = sessionKey.indexOf(":", firstSeparator + 1); + const sessionSegment = secondSeparator < 0 ? sessionKey : sessionKey.slice(secondSeparator + 1); + return sessionSegment.startsWith(DREAMING_SESSION_KEY_PREFIX); +} + +async function normalizeSessionEntryPathForComparison(params: { + sessionsDir: string; + entry: { sessionFile?: string; sessionId?: string } | undefined; +}): Promise { + const sessionFile = typeof params.entry?.sessionFile === "string" ? params.entry.sessionFile : ""; + if (sessionFile) { + return normalizeSessionFileForComparison({ + sessionsDir: params.sessionsDir, + sessionFile, + }); + } + const sessionId = + typeof params.entry?.sessionId === "string" ? params.entry.sessionId.trim() : ""; + if (!SAFE_SESSION_ID_RE.test(sessionId)) { + return null; + } + return normalizeSessionFileForComparison({ + sessionsDir: params.sessionsDir, + sessionFile: `${sessionId}.jsonl`, + }); +} + +async function scrubDreamingNarrativeArtifacts(logger: Logger): Promise { + const cfg = loadConfig(); + const agentsDir = path.join(resolveStateDir(), "agents"); + let agentEntries: Dirent[] = []; + try { + agentEntries = await fs.readdir(agentsDir, { withFileTypes: true }); + } catch { + return; + } + + let prunedEntries = 0; + let archivedOrphans = 0; + + for (const agentEntry of agentEntries) { + if (!agentEntry.isDirectory()) { + continue; + } + + const storePath = resolveStorePath(cfg.session?.store, { agentId: agentEntry.name }); + const sessionsDir = path.dirname(storePath); + let store: Record; + try { + store = loadSessionStore(storePath) as Record< + string, + { sessionFile?: string; sessionId?: string } | undefined + >; + } catch { + continue; + } + + const referencedSessionFiles = new Set(); + let needsStoreUpdate = false; + for (const [key, entry] of Object.entries(store)) { + const normalizedSessionFile = await normalizeSessionEntryPathForComparison({ + sessionsDir, + entry, + }); + if (normalizedSessionFile) { + referencedSessionFiles.add(normalizedSessionFile); + } + if (!isDreamingSessionStoreKey(key)) { + continue; + } + if (!normalizedSessionFile || !(await safePathExists(normalizedSessionFile))) { + needsStoreUpdate = true; + } + } + + if (needsStoreUpdate) { + referencedSessionFiles.clear(); + prunedEntries += await updateSessionStore(storePath, async (lockedStore) => { + let prunedForAgent = 0; + for (const [key, entry] of Object.entries(lockedStore)) { + const normalizedSessionFile = await normalizeSessionEntryPathForComparison({ + sessionsDir, + entry, + }); + if (normalizedSessionFile) { + referencedSessionFiles.add(normalizedSessionFile); + } + if (!isDreamingSessionStoreKey(key)) { + continue; + } + if (!normalizedSessionFile || !(await safePathExists(normalizedSessionFile))) { + delete lockedStore[key]; + prunedForAgent += 1; + } + } + return prunedForAgent; + }); + } + + let sessionFiles: Dirent[] = []; + try { + sessionFiles = await fs.readdir(sessionsDir, { withFileTypes: true }); + } catch { + continue; + } + + for (const fileEntry of sessionFiles) { + if (!fileEntry.isFile() || !fileEntry.name.endsWith(".jsonl")) { + continue; + } + const transcriptPath = path.join(sessionsDir, fileEntry.name); + const normalizedTranscriptPath = + (await normalizeSessionFileForComparison({ + sessionsDir, + sessionFile: fileEntry.name, + })) ?? normalizeComparablePath(transcriptPath); + if (referencedSessionFiles.has(normalizedTranscriptPath)) { + continue; + } + let stat; + try { + stat = await fs.stat(transcriptPath); + } catch { + continue; + } + if (Date.now() - stat.mtimeMs < DREAMING_ORPHAN_MIN_AGE_MS) { + continue; + } + let content = ""; + try { + content = await fs.readFile(transcriptPath, "utf-8"); + } catch { + continue; + } + if (!content.includes(DREAMING_TRANSCRIPT_RUN_MARKER)) { + continue; + } + const archivedPath = `${transcriptPath}.deleted.${Date.now()}`; + try { + await fs.rename(transcriptPath, archivedPath); + archivedOrphans += 1; + } catch { + // best-effort scrubber + } + } + } + + if (prunedEntries > 0 || archivedOrphans > 0) { + logger.info( + `memory-core: dreaming cleanup scrubbed ${prunedEntries} stale session entr${prunedEntries === 1 ? "y" : "ies"} and archived ${archivedOrphans} orphan transcript${archivedOrphans === 1 ? "" : "s"}.`, + ); + } +} + export async function generateAndAppendDreamNarrative(params: { subagent: SubagentSurface; workspaceDir: string; @@ -636,9 +837,11 @@ export async function generateAndAppendDreamNarrative(params: { const sessionKey = `dreaming-narrative-${params.data.phase}-${nowMs}`; const message = buildNarrativePrompt(params.data); + let runId: string | null = null; + let waitStatus: string | null = null; try { - const runId = await startNarrativeRunOrFallback({ + runId = await startNarrativeRunOrFallback({ subagent: params.subagent, sessionKey, message, @@ -656,6 +859,7 @@ export async function generateAndAppendDreamNarrative(params: { runId, timeoutMs: NARRATIVE_TIMEOUT_MS, }); + waitStatus = result.status; if (result.status !== "ok") { params.logger.warn( @@ -693,11 +897,36 @@ export async function generateAndAppendDreamNarrative(params: { `memory-core: narrative generation failed for ${params.data.phase} phase: ${formatErrorMessage(err)}`, ); } finally { - // Clean up the transient session. + if (runId && waitStatus === "timeout") { + try { + const settle = await params.subagent.waitForRun({ + runId, + timeoutMs: NARRATIVE_DELETE_SETTLE_TIMEOUT_MS, + }); + if (settle.status !== "ok" && settle.status !== "error") { + params.logger.warn( + `memory-core: narrative cleanup wait ended with status=${settle.status} for ${params.data.phase} phase.`, + ); + } + } catch (cleanupWaitErr) { + params.logger.warn( + `memory-core: narrative cleanup wait failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupWaitErr)}`, + ); + } + } + try { await params.subagent.deleteSession({ sessionKey }); - } catch { - // Ignore cleanup failures. + } catch (cleanupErr) { + params.logger.warn( + `memory-core: narrative session cleanup failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupErr)}`, + ); } + + await scrubDreamingNarrativeArtifacts(params.logger).catch((scrubErr: unknown) => { + params.logger.warn( + `memory-core: dreaming cleanup scrub failed for ${params.data.phase} phase: ${formatErrorMessage(scrubErr)}`, + ); + }); } }