fix: harden dreaming narrative session cleanup (#65320)

* fix: harden dreaming narrative session cleanup

* fix(memory-core): harden narrative cleanup

* fix(memory-core): preserve fallback narrative sessions

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
Sergiusz
2026-04-12 19:33:47 +03:00
committed by GitHub
parent aff8a0c0e7
commit 079eb18bf7
3 changed files with 322 additions and 8 deletions

View File

@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
- Infra/net: fix multipart FormData fields (including `model`) being silently dropped when a guarded runtime fetch body crosses a FormData implementation boundary, restoring OpenAI audio transcription requests that failed with HTTP 400. (#64349) Thanks @petr-sloup.
- Dreaming/diary: use the host local timezone for diary timestamps when `dreaming.timezone` is unset, so `DREAMS.md` and the UI stop defaulting to UTC. (#65034) Thanks @neo1027144-creator and @vincentkoc.
- Dreaming/diary: include the timezone abbreviation in diary timestamps so `DREAMS.md` and the UI make UTC or local host time explicit. (#65057) Thanks @Yanhu007 and @vincentkoc.
- Dreaming/narrative: harden transient narrative cleanup by retrying timed-out deletes and scrubbing stale dreaming session artifacts through the lock-aware session-store path. (#65320) Thanks @serkonyc and @vincentkoc.
- Plugins/memory: restore cached memory capability public artifacts on plugin-registry cache hits so memory-backed artifact surfaces stay visible after warm loads. Thanks @sercada and @vincentkoc.
- Gateway/cron: preserve requested isolated-agent config across runtime reloads so subagent jobs and heartbeat overrides keep the right workspace and heartbeat settings when the hot-loaded snapshot is stale. Thanks @l0cka and @vincentkoc.
- Gateway/plugins: always send a non-empty `idempotencyKey` for plugin subagent runs, so dreaming narrative jobs stop failing gateway schema validation. (#65354) Thanks @CodeForgeNet and @vincentkoc.

View File

@@ -1,9 +1,11 @@
import fs from "node:fs/promises";
import path from "node:path";
import * as configRuntimeModule from "openclaw/plugin-sdk/config-runtime";
import {
RequestScopedSubagentRuntimeError,
SUBAGENT_RUNTIME_REQUEST_SCOPE_ERROR_CODE,
} from "openclaw/plugin-sdk/error-runtime";
import * as memoryCoreHostRuntimeCoreModule from "openclaw/plugin-sdk/memory-core-host-runtime-core";
import { afterEach, describe, expect, it, vi } from "vitest";
import { resolveGlobalMap } from "../../../src/shared/global-singleton.js";
import {
@@ -126,10 +128,7 @@ describe("formatNarrativeDate", () => {
it("applies an explicit timezone", () => {
// 2026-04-11T21:46:55Z in America/Los_Angeles (PDT, UTC-7) → 2:46 PM
const date = formatNarrativeDate(
Date.parse("2026-04-11T21:46:55Z"),
"America/Los_Angeles",
);
const date = formatNarrativeDate(Date.parse("2026-04-11T21:46:55Z"), "America/Los_Angeles");
expect(date).toContain("2:46");
expect(date).toContain("PM");
expect(date).toContain("PDT");
@@ -654,6 +653,29 @@ describe("generateAndAppendDreamNarrative", () => {
expect(exists).toBe(false);
});
it("waits once more before cleanup after timeout and logs cleanup failures", async () => {
const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-");
const subagent = createMockSubagent("");
subagent.waitForRun
.mockResolvedValueOnce({ status: "timeout" })
.mockResolvedValueOnce({ status: "ok" });
subagent.deleteSession.mockRejectedValue(new Error("still active"));
const logger = createMockLogger();
await generateAndAppendDreamNarrative({
subagent,
workspaceDir,
data: { phase: "rem", snippets: ["some memory"] },
logger,
});
expect(subagent.waitForRun).toHaveBeenCalledTimes(2);
expect(subagent.waitForRun.mock.calls[1][0]).toMatchObject({ timeoutMs: 120_000 });
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining("narrative session cleanup failed for rem phase"),
);
});
it("handles subagent error gracefully", async () => {
const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-");
const subagent = createMockSubagent("");
@@ -764,4 +786,66 @@ describe("generateAndAppendDreamNarrative", () => {
expect(subagent.deleteSession).toHaveBeenCalled();
});
it("scrubs stale dreaming entries and orphan transcripts after cleanup", async () => {
const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-");
const stateDir = await createTempWorkspace("openclaw-dreaming-state-");
const sessionsDir = path.join(stateDir, "agents", "main", "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const storePath = path.join(sessionsDir, "sessions.json");
const orphanPath = path.join(sessionsDir, "orphan.jsonl");
const livePath = path.join(sessionsDir, "still-live.jsonl");
await fs.writeFile(
storePath,
`${JSON.stringify({
"agent:main:dreaming-narrative-light-1": {
sessionId: "missing",
},
"agent:main:kept-session": {
sessionId: "still-live",
},
"agent:main:telegram:group:dreaming-narrative-room": {
sessionId: "still-missing-non-dreaming",
},
})}\n`,
"utf-8",
);
await fs.writeFile(orphanPath, '{"runId":"dreaming-narrative-light-123"}\n', "utf-8");
await fs.writeFile(livePath, '{"runId":"dreaming-narrative-light-keep"}\n', "utf-8");
const oldDate = new Date(Date.now() - 600_000);
await fs.utimes(orphanPath, oldDate, oldDate);
await fs.utimes(livePath, oldDate, oldDate);
vi.spyOn(configRuntimeModule, "loadConfig").mockReturnValue({ session: {} } as never);
vi.spyOn(configRuntimeModule, "resolveStorePath").mockImplementation(((
_store: string | undefined,
{ agentId }: { agentId: string },
) => {
expect(agentId).toBe("main");
return storePath;
}) as typeof configRuntimeModule.resolveStorePath);
vi.spyOn(memoryCoreHostRuntimeCoreModule, "resolveStateDir").mockReturnValue(stateDir);
const subagent = createMockSubagent("The repository whispered of forgotten endpoints.");
const logger = createMockLogger();
await generateAndAppendDreamNarrative({
subagent,
workspaceDir,
data: { phase: "light", snippets: ["memory fragment"] },
logger,
});
const updatedStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
unknown
>;
expect(updatedStore).not.toHaveProperty("agent:main:dreaming-narrative-light-1");
expect(updatedStore).toHaveProperty("agent:main:kept-session");
expect(updatedStore).toHaveProperty("agent:main:telegram:group:dreaming-narrative-room");
const sessionFiles = await fs.readdir(sessionsDir);
expect(sessionFiles.some((name) => name.startsWith("orphan.jsonl.deleted."))).toBe(true);
expect(sessionFiles).toContain("still-live.jsonl");
expect(logger.info).toHaveBeenCalledWith(expect.stringContaining("dreaming cleanup scrubbed"));
});
});

View File

@@ -1,5 +1,12 @@
import type { Dirent } from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import {
loadConfig,
loadSessionStore,
resolveStorePath,
updateSessionStore,
} from "openclaw/plugin-sdk/config-runtime";
import {
extractErrorCode,
formatErrorMessage,
@@ -9,6 +16,7 @@ import {
} from "openclaw/plugin-sdk/error-runtime";
import { resolveGlobalMap } from "openclaw/plugin-sdk/global-singleton";
import { createAsyncLock } from "openclaw/plugin-sdk/infra-runtime";
import { resolveStateDir } from "openclaw/plugin-sdk/memory-core-host-runtime-core";
// ── Types ──────────────────────────────────────────────────────────────
@@ -76,6 +84,11 @@ const NARRATIVE_SYSTEM_PROMPT = [
].join("\n");
const NARRATIVE_TIMEOUT_MS = 60_000;
const NARRATIVE_DELETE_SETTLE_TIMEOUT_MS = 120_000;
const DREAMING_SESSION_KEY_PREFIX = "dreaming-narrative-";
const DREAMING_TRANSCRIPT_RUN_MARKER = '"runId":"dreaming-narrative-';
const DREAMING_ORPHAN_MIN_AGE_MS = 300_000;
const SAFE_SESSION_ID_RE = /^[a-z0-9][a-z0-9._-]{0,127}$/i;
const DREAMS_FILENAMES = ["DREAMS.md", "dreams.md"] as const;
const DIARY_START_MARKER = "<!-- openclaw:dreaming:diary:start -->";
const DIARY_END_MARKER = "<!-- openclaw:dreaming:diary:end -->";
@@ -620,6 +633,194 @@ export async function appendNarrativeEntry(params: {
// ── Orchestrator ───────────────────────────────────────────────────────
async function safePathExists(pathname: string): Promise<boolean> {
try {
await fs.stat(pathname);
return true;
} catch {
return false;
}
}
function normalizeComparablePath(pathname: string): string {
return process.platform === "win32" ? pathname.toLowerCase() : pathname;
}
async function normalizeSessionFileForComparison(params: {
sessionsDir: string;
sessionFile: string;
}): Promise<string | null> {
const trimmed = params.sessionFile.trim();
if (!trimmed) {
return null;
}
const resolved = path.isAbsolute(trimmed) ? trimmed : path.resolve(params.sessionsDir, trimmed);
try {
return normalizeComparablePath(await fs.realpath(resolved));
} catch {
return normalizeComparablePath(path.resolve(resolved));
}
}
function isDreamingSessionStoreKey(sessionKey: string): boolean {
const firstSeparator = sessionKey.indexOf(":");
if (firstSeparator < 0) {
return sessionKey.startsWith(DREAMING_SESSION_KEY_PREFIX);
}
const secondSeparator = sessionKey.indexOf(":", firstSeparator + 1);
const sessionSegment = secondSeparator < 0 ? sessionKey : sessionKey.slice(secondSeparator + 1);
return sessionSegment.startsWith(DREAMING_SESSION_KEY_PREFIX);
}
async function normalizeSessionEntryPathForComparison(params: {
sessionsDir: string;
entry: { sessionFile?: string; sessionId?: string } | undefined;
}): Promise<string | null> {
const sessionFile = typeof params.entry?.sessionFile === "string" ? params.entry.sessionFile : "";
if (sessionFile) {
return normalizeSessionFileForComparison({
sessionsDir: params.sessionsDir,
sessionFile,
});
}
const sessionId =
typeof params.entry?.sessionId === "string" ? params.entry.sessionId.trim() : "";
if (!SAFE_SESSION_ID_RE.test(sessionId)) {
return null;
}
return normalizeSessionFileForComparison({
sessionsDir: params.sessionsDir,
sessionFile: `${sessionId}.jsonl`,
});
}
async function scrubDreamingNarrativeArtifacts(logger: Logger): Promise<void> {
const cfg = loadConfig();
const agentsDir = path.join(resolveStateDir(), "agents");
let agentEntries: Dirent[] = [];
try {
agentEntries = await fs.readdir(agentsDir, { withFileTypes: true });
} catch {
return;
}
let prunedEntries = 0;
let archivedOrphans = 0;
for (const agentEntry of agentEntries) {
if (!agentEntry.isDirectory()) {
continue;
}
const storePath = resolveStorePath(cfg.session?.store, { agentId: agentEntry.name });
const sessionsDir = path.dirname(storePath);
let store: Record<string, { sessionFile?: string; sessionId?: string } | undefined>;
try {
store = loadSessionStore(storePath) as Record<
string,
{ sessionFile?: string; sessionId?: string } | undefined
>;
} catch {
continue;
}
const referencedSessionFiles = new Set<string>();
let needsStoreUpdate = false;
for (const [key, entry] of Object.entries(store)) {
const normalizedSessionFile = await normalizeSessionEntryPathForComparison({
sessionsDir,
entry,
});
if (normalizedSessionFile) {
referencedSessionFiles.add(normalizedSessionFile);
}
if (!isDreamingSessionStoreKey(key)) {
continue;
}
if (!normalizedSessionFile || !(await safePathExists(normalizedSessionFile))) {
needsStoreUpdate = true;
}
}
if (needsStoreUpdate) {
referencedSessionFiles.clear();
prunedEntries += await updateSessionStore(storePath, async (lockedStore) => {
let prunedForAgent = 0;
for (const [key, entry] of Object.entries(lockedStore)) {
const normalizedSessionFile = await normalizeSessionEntryPathForComparison({
sessionsDir,
entry,
});
if (normalizedSessionFile) {
referencedSessionFiles.add(normalizedSessionFile);
}
if (!isDreamingSessionStoreKey(key)) {
continue;
}
if (!normalizedSessionFile || !(await safePathExists(normalizedSessionFile))) {
delete lockedStore[key];
prunedForAgent += 1;
}
}
return prunedForAgent;
});
}
let sessionFiles: Dirent[] = [];
try {
sessionFiles = await fs.readdir(sessionsDir, { withFileTypes: true });
} catch {
continue;
}
for (const fileEntry of sessionFiles) {
if (!fileEntry.isFile() || !fileEntry.name.endsWith(".jsonl")) {
continue;
}
const transcriptPath = path.join(sessionsDir, fileEntry.name);
const normalizedTranscriptPath =
(await normalizeSessionFileForComparison({
sessionsDir,
sessionFile: fileEntry.name,
})) ?? normalizeComparablePath(transcriptPath);
if (referencedSessionFiles.has(normalizedTranscriptPath)) {
continue;
}
let stat;
try {
stat = await fs.stat(transcriptPath);
} catch {
continue;
}
if (Date.now() - stat.mtimeMs < DREAMING_ORPHAN_MIN_AGE_MS) {
continue;
}
let content = "";
try {
content = await fs.readFile(transcriptPath, "utf-8");
} catch {
continue;
}
if (!content.includes(DREAMING_TRANSCRIPT_RUN_MARKER)) {
continue;
}
const archivedPath = `${transcriptPath}.deleted.${Date.now()}`;
try {
await fs.rename(transcriptPath, archivedPath);
archivedOrphans += 1;
} catch {
// best-effort scrubber
}
}
}
if (prunedEntries > 0 || archivedOrphans > 0) {
logger.info(
`memory-core: dreaming cleanup scrubbed ${prunedEntries} stale session entr${prunedEntries === 1 ? "y" : "ies"} and archived ${archivedOrphans} orphan transcript${archivedOrphans === 1 ? "" : "s"}.`,
);
}
}
export async function generateAndAppendDreamNarrative(params: {
subagent: SubagentSurface;
workspaceDir: string;
@@ -636,9 +837,11 @@ export async function generateAndAppendDreamNarrative(params: {
const sessionKey = `dreaming-narrative-${params.data.phase}-${nowMs}`;
const message = buildNarrativePrompt(params.data);
let runId: string | null = null;
let waitStatus: string | null = null;
try {
const runId = await startNarrativeRunOrFallback({
runId = await startNarrativeRunOrFallback({
subagent: params.subagent,
sessionKey,
message,
@@ -656,6 +859,7 @@ export async function generateAndAppendDreamNarrative(params: {
runId,
timeoutMs: NARRATIVE_TIMEOUT_MS,
});
waitStatus = result.status;
if (result.status !== "ok") {
params.logger.warn(
@@ -693,11 +897,36 @@ export async function generateAndAppendDreamNarrative(params: {
`memory-core: narrative generation failed for ${params.data.phase} phase: ${formatErrorMessage(err)}`,
);
} finally {
// Clean up the transient session.
if (runId && waitStatus === "timeout") {
try {
const settle = await params.subagent.waitForRun({
runId,
timeoutMs: NARRATIVE_DELETE_SETTLE_TIMEOUT_MS,
});
if (settle.status !== "ok" && settle.status !== "error") {
params.logger.warn(
`memory-core: narrative cleanup wait ended with status=${settle.status} for ${params.data.phase} phase.`,
);
}
} catch (cleanupWaitErr) {
params.logger.warn(
`memory-core: narrative cleanup wait failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupWaitErr)}`,
);
}
}
try {
await params.subagent.deleteSession({ sessionKey });
} catch {
// Ignore cleanup failures.
} catch (cleanupErr) {
params.logger.warn(
`memory-core: narrative session cleanup failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupErr)}`,
);
}
await scrubDreamingNarrativeArtifacts(params.logger).catch((scrubErr: unknown) => {
params.logger.warn(
`memory-core: dreaming cleanup scrub failed for ${params.data.phase} phase: ${formatErrorMessage(scrubErr)}`,
);
});
}
}