diff --git a/CHANGELOG.md b/CHANGELOG.md index 5aab345caf8..c46cd5908e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ Docs: https://docs.openclaw.ai - Codex harness/status: pin embedded harness selection per session, show active non-PI harness ids such as `codex` in `/status`, and keep legacy transcripts on PI until `/new` or `/reset` so config changes cannot hot-switch existing sessions. - Gateway/security: fail closed on agent-driven `gateway config.apply`/`config.patch` runtime edits by allowlisting a narrow set of agent-tunable prompt, model, and mention-gating paths (including Telegram topic-level `requireMention`) instead of relying on a hand-maintained denylist of protected subtrees that could miss new sensitive config keys. (#70726) Thanks @drobison00. - Webhooks/security: re-resolve `SecretRef`-backed webhook route secrets on each request so `openclaw secrets reload` revokes the previous secret immediately instead of waiting for a gateway restart. (#70727) Thanks @drobison00. +- Memory/dreaming: decouple the managed dreaming cron from heartbeat by running it as an isolated lightweight agent turn, so dreaming runs even when heartbeat is disabled for the default agent and is no longer skipped by `heartbeat.activeHours`. `openclaw doctor --fix` migrates stale main-session dreaming jobs in persisted cron configs to the new shape. Fixes #69811, #67397, #68972. (#70737) Thanks @jalehman. ## 2026.4.22 @@ -263,7 +264,6 @@ Docs: https://docs.openclaw.ai - CLI/channels: resolve channel presence through a shared policy that keeps ambient env vars and stale persisted auth from surfacing disabled bundled plugins in status, doctor, security audit, and cron delivery validation unless the channel or plugin is effectively enabled or explicitly configured. (#69862) Thanks @gumadeiras. - Doctor/plugins: hydrate legacy partial interactive handler state before plugin reload clears dedupe caches, so `openclaw doctor` and post-update doctor runs no longer crash with `Cannot read properties of undefined (reading 'clear')`. (#70135) Thanks @ngutman. - Control UI/config: preserve intentionally empty raw config snapshots when clearing pending updates so reset restores the original bytes instead of synthesizing JSON for blank config files. (#68178) Thanks @BunsDev. -- memory-core/dreaming: surface a `Dreaming status: blocked` line in `openclaw memory status` when dreaming is enabled but the heartbeat that drives the managed cron is not firing for the default agent, and add a Troubleshooting section to the dreaming docs covering the two common causes (per-agent `heartbeat` blocks excluding `main`, and `heartbeat.every` set to `0`/empty/invalid), so the silent failure described in #69843 becomes legible on the status surface. - Cron/run-log: report generic `message` tool sends under the resolved delivery channel when they match the cron target, while preserving account-specific mismatch checks for delivery traces. (#69940) Thanks @davehappyminion. - Doctor/channels: merge configured-channel doctor hooks across read-only, loaded, setup, and runtime plugin discovery so partial adapters no longer hide runtime-only compatibility repair or allowlist warnings, preserve disabled-channel opt-outs, and ignore malformed hook values before they can mask valid fallbacks. (#69919) Thanks @gumadeiras. - Models/CLI: show bundled provider-owned static catalog rows in `models list --all` before auth is configured, including Kimi K2.6 rows for Moonshot, OpenRouter, and Vercel AI Gateway, while keeping local-only and workspace plugin catalog paths isolated. (#69909) Thanks @shakkernerd. diff --git a/docs/concepts/dreaming.md b/docs/concepts/dreaming.md index e2687c46518..6f107b5c947 100644 --- a/docs/concepts/dreaming.md +++ b/docs/concepts/dreaming.md @@ -227,20 +227,8 @@ When enabled, the Gateway **Dreams** tab shows: - a distinct grounded Scene lane for staged historical replay entries - an expandable Dream Diary reader backed by `doctor.memory.dreamDiary` -## Troubleshooting - -### Dreaming never runs (status shows blocked) - -The managed dreaming cron rides the default agent's heartbeat. If heartbeat is not firing for that agent, the cron enqueues a system event that nobody consumes and dreaming silently does not run. Both `openclaw memory status` and `/dreaming status` will report `blocked` in that case and name the agent whose heartbeat is the blocker. - -Two common causes: - -- Another agent declares an explicit `heartbeat:` block. When any entry in `agents.list` has its own `heartbeat` block, only those agents heartbeat — the defaults stop applying to everyone else, so the default agent can go silent. Move the heartbeat settings to `agents.defaults.heartbeat`, or add an explicit `heartbeat` block on the default agent. See [Scope and precedence](/gateway/heartbeat#scope-and-precedence). -- `heartbeat.every` is `0`, empty, or unparsable. The cron has no interval to schedule against, so the heartbeat is effectively disabled. Set `every` to a positive duration such as `30m`. See [Defaults](/gateway/heartbeat#defaults). - ## Related -- [Heartbeat](/gateway/heartbeat) - [Memory](/concepts/memory) - [Memory Search](/concepts/memory-search) - [memory CLI](/cli/memory) diff --git a/extensions/memory-core/src/cli.runtime.ts b/extensions/memory-core/src/cli.runtime.ts index 56a8d4defe5..f09522daf30 100644 --- a/extensions/memory-core/src/cli.runtime.ts +++ b/extensions/memory-core/src/cli.runtime.ts @@ -44,10 +44,7 @@ import { type RepairDreamingArtifactsResult, } from "./dreaming-repair.js"; import { asRecord } from "./dreaming-shared.js"; -import { - resolveDreamingBlockedReason, - resolveShortTermPromotionDreamingConfig, -} from "./dreaming.js"; +import { resolveShortTermPromotionDreamingConfig } from "./dreaming.js"; import { previewGroundedRemMarkdown } from "./rem-evidence.js"; import { applyShortTermPromotions, @@ -853,10 +850,6 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { `${label("Workspace")} ${info(workspacePath)}`, `${label("Dreaming")} ${info(formatDreamingSummary(cfg))}`, ].filter(Boolean) as string[]; - const dreamingBlockedReason = resolveDreamingBlockedReason(cfg); - if (dreamingBlockedReason) { - lines.push(`${label("Dreaming status")} ${warn(`blocked - ${dreamingBlockedReason}`)}`); - } if (embeddingProbe) { const state = embeddingProbe.ok ? "ready" : "unavailable"; const stateColor = embeddingProbe.ok ? theme.success : theme.warn; diff --git a/extensions/memory-core/src/cli.test.ts b/extensions/memory-core/src/cli.test.ts index 195d1be8155..d463823554c 100644 --- a/extensions/memory-core/src/cli.test.ts +++ b/extensions/memory-core/src/cli.test.ts @@ -384,135 +384,6 @@ describe("memory cli", () => { }); }); - it("reports dreaming blocked when another explicit heartbeat agent excludes main", async () => { - loadConfig.mockReturnValue({ - plugins: { - entries: { - "memory-core": { - config: { - dreaming: { - enabled: true, - }, - }, - }, - }, - }, - agents: { - defaults: { - heartbeat: { - every: "30m", - }, - }, - list: [ - { id: "main", default: true }, - { - id: "ops", - heartbeat: { - every: "1h", - }, - }, - ], - }, - }); - const close = vi.fn(async () => {}); - mockManager({ - probeVectorAvailability: vi.fn(async () => true), - status: () => makeMemoryStatus({ workspaceDir: "/tmp/openclaw" }), - close, - }); - - const log = spyRuntimeLogs(defaultRuntime); - await runMemoryCli(["status", "--agent", "main"]); - - expect(log).toHaveBeenCalledWith( - expect.stringContaining( - 'Dreaming status: blocked - dreaming is enabled but will not run because heartbeat is disabled for "main". See https://docs.openclaw.ai/concepts/dreaming#troubleshooting', - ), - ); - expect(close).toHaveBeenCalled(); - }); - - it('reports dreaming blocked when main heartbeat interval is "0m"', async () => { - loadConfig.mockReturnValue({ - plugins: { - entries: { - "memory-core": { - config: { - dreaming: { - enabled: true, - }, - }, - }, - }, - }, - agents: { - defaults: { - heartbeat: { - every: "0m", - }, - }, - list: [{ id: "main", default: true }], - }, - }); - const close = vi.fn(async () => {}); - mockManager({ - probeVectorAvailability: vi.fn(async () => true), - status: () => makeMemoryStatus({ workspaceDir: "/tmp/openclaw" }), - close, - }); - - const log = spyRuntimeLogs(defaultRuntime); - await runMemoryCli(["status"]); - - expect(log).toHaveBeenCalledWith( - expect.stringContaining( - 'Dreaming status: blocked - dreaming is enabled but will not run because heartbeat is disabled for "main". See https://docs.openclaw.ai/concepts/dreaming#troubleshooting', - ), - ); - expect(close).toHaveBeenCalled(); - }); - - it("reports dreaming blocked for the configured default agent when it is not main", async () => { - resolveDefaultAgentId.mockReturnValue("ops"); - loadConfig.mockReturnValue({ - plugins: { - entries: { - "memory-core": { - config: { - dreaming: { - enabled: true, - }, - }, - }, - }, - }, - agents: { - defaults: { - heartbeat: { - every: "0m", - }, - }, - list: [{ id: "ops", default: true }], - }, - }); - const close = vi.fn(async () => {}); - mockManager({ - probeVectorAvailability: vi.fn(async () => true), - status: () => makeMemoryStatus({ workspaceDir: "/tmp/openclaw" }), - close, - }); - - const log = spyRuntimeLogs(defaultRuntime); - await runMemoryCli(["status", "--agent", "ops"]); - - expect(log).toHaveBeenCalledWith( - expect.stringContaining( - 'Dreaming status: blocked - dreaming is enabled but will not run because heartbeat is disabled for "ops". See https://docs.openclaw.ai/concepts/dreaming#troubleshooting', - ), - ); - expect(close).toHaveBeenCalled(); - }); - it("repairs invalid recall metadata and stale locks with status --fix", async () => { await withTempWorkspace(async (workspaceDir) => { const storePath = path.join(workspaceDir, "memory", ".dreams", "short-term-recall.json"); diff --git a/extensions/memory-core/src/concept-vocabulary.test.ts b/extensions/memory-core/src/concept-vocabulary.test.ts index dd8996bf4ae..da7f98beb89 100644 --- a/extensions/memory-core/src/concept-vocabulary.test.ts +++ b/extensions/memory-core/src/concept-vocabulary.test.ts @@ -59,6 +59,22 @@ describe("concept vocabulary", () => { expect(classifyConceptTagScript("qmd路由器")).toBe("mixed"); }); + it("drops chat scaffolding stop words from derived concept tags", () => { + const tags = deriveConceptTags({ + path: "memory/.dreams/session-corpus/2026-04-16.txt", + snippet: + "Assistant: the system should remind you about the Ollama provider setup in your workspace.", + }); + + expect(tags).toContain("ollama"); + expect(tags).toContain("provider"); + expect(tags).not.toContain("assistant"); + expect(tags).not.toContain("system"); + expect(tags).not.toContain("the"); + expect(tags).not.toContain("you"); + expect(tags).not.toContain("your"); + }); + it("summarizes entry coverage across latin, cjk, and mixed tags", () => { expect( summarizeConceptTagScriptCoverage([ diff --git a/extensions/memory-core/src/concept-vocabulary.ts b/extensions/memory-core/src/concept-vocabulary.ts index 43a4eba5fef..9a5d4c52bff 100644 --- a/extensions/memory-core/src/concept-vocabulary.ts +++ b/extensions/memory-core/src/concept-vocabulary.ts @@ -19,6 +19,7 @@ const LANGUAGE_STOP_WORDS = { "agent", "again", "also", + "assistant", "because", "before", "being", @@ -64,6 +65,8 @@ const LANGUAGE_STOP_WORDS = { "should", "since", "some", + "subagent", + "system", "than", "that", "their", @@ -73,13 +76,14 @@ const LANGUAGE_STOP_WORDS = { "this", "through", "today", + "user", "using", "with", "work", "workspace", "year", ], - english: ["and", "are", "for", "into", "its", "our", "then", "were"], + english: ["and", "are", "for", "into", "its", "our", "the", "then", "were", "you", "your"], spanish: [ "al", "con", diff --git a/extensions/memory-core/src/dreaming-command.test.ts b/extensions/memory-core/src/dreaming-command.test.ts index 6ad0d9451d5..95c5361ec06 100644 --- a/extensions/memory-core/src/dreaming-command.test.ts +++ b/extensions/memory-core/src/dreaming-command.test.ts @@ -197,95 +197,4 @@ describe("memory-core /dreaming command", () => { expect(result.text).toContain("Usage: /dreaming status"); expect(runtime.config.writeConfigFile).not.toHaveBeenCalled(); }); - - it("shows a blocked line directly after enabled when main heartbeat is disabled", async () => { - const { command } = createHarness({ - plugins: { - entries: { - "memory-core": { - config: { - dreaming: { - enabled: true, - }, - }, - }, - }, - }, - agents: { - defaults: { - heartbeat: { - every: "0m", - }, - }, - list: [{ id: "main", default: true }], - }, - }); - - const result = await command.handler(createCommandContext("status")); - const text = result.text ?? ""; - - expect(text).toContain( - '- blocked: dreaming is enabled but will not run because heartbeat is disabled for "main". See https://docs.openclaw.ai/concepts/dreaming#troubleshooting', - ); - - const lines = text.split("\n"); - const enabledIdx = lines.findIndex((line) => line.startsWith("- enabled:")); - const blockedIdx = lines.findIndex((line) => line.startsWith("- blocked:")); - expect(enabledIdx).toBeGreaterThan(-1); - expect(blockedIdx).toBe(enabledIdx + 1); - }); - - it("surfaces the blocked line on /dreaming on when main heartbeat is disabled", async () => { - const { command } = createHarness({ - agents: { - defaults: { - heartbeat: { - every: "0m", - }, - }, - list: [{ id: "main", default: true }], - }, - }); - - const result = await command.handler( - createCommandContext("on", { - gatewayClientScopes: ["operator.admin"], - }), - ); - const text = result.text ?? ""; - - expect(text).toContain("Dreaming enabled."); - expect(text).toContain( - '- blocked: dreaming is enabled but will not run because heartbeat is disabled for "main". See https://docs.openclaw.ai/concepts/dreaming#troubleshooting', - ); - }); - - it("omits the blocked line when dreaming is enabled and main heartbeat is healthy", async () => { - const { command } = createHarness({ - plugins: { - entries: { - "memory-core": { - config: { - dreaming: { - enabled: true, - }, - }, - }, - }, - }, - agents: { - defaults: { - heartbeat: { - every: "30m", - }, - }, - list: [{ id: "main", default: true }], - }, - }); - - const result = await command.handler(createCommandContext("status")); - - expect(result.text).toContain("- enabled: on"); - expect(result.text).not.toContain("- blocked:"); - }); }); diff --git a/extensions/memory-core/src/dreaming-command.ts b/extensions/memory-core/src/dreaming-command.ts index d10e0e2f3ec..2f202b3e10c 100644 --- a/extensions/memory-core/src/dreaming-command.ts +++ b/extensions/memory-core/src/dreaming-command.ts @@ -2,10 +2,7 @@ import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memo import { resolveMemoryDreamingConfig } from "openclaw/plugin-sdk/memory-core-host-status"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { asRecord } from "./dreaming-shared.js"; -import { - resolveDreamingBlockedReason, - resolveShortTermPromotionDreamingConfig, -} from "./dreaming.js"; +import { resolveShortTermPromotionDreamingConfig } from "./dreaming.js"; function resolveMemoryCorePluginConfig(cfg: OpenClawConfig): Record { const entry = asRecord(cfg.plugins?.entries?.["memory-core"]); @@ -57,12 +54,10 @@ function formatStatus(cfg: OpenClawConfig): string { }); const deep = resolveShortTermPromotionDreamingConfig({ pluginConfig, cfg }); const timezone = dreaming.timezone ? ` (${dreaming.timezone})` : ""; - const blockedReason = resolveDreamingBlockedReason(cfg); return [ "Dreaming status:", `- enabled: ${formatEnabled(dreaming.enabled)}${timezone}`, - ...(blockedReason ? [`- blocked: ${blockedReason}`] : []), `- sweep cadence: ${dreaming.frequency}`, `- promotion policy: score>=${deep.minScore}, recalls>=${deep.minRecallCount}, uniqueQueries>=${deep.minUniqueQueries}`, ].join("\n"); diff --git a/extensions/memory-core/src/dreaming-narrative.test.ts b/extensions/memory-core/src/dreaming-narrative.test.ts index e62ddaea61f..165f4d6ce64 100644 --- a/extensions/memory-core/src/dreaming-narrative.test.ts +++ b/extensions/memory-core/src/dreaming-narrative.test.ts @@ -603,6 +603,8 @@ describe("generateAndAppendDreamNarrative", () => { expect(subagent.run.mock.calls[0][0]).toMatchObject({ idempotencyKey: expectedSessionKey, sessionKey: expectedSessionKey, + lane: `dreaming-narrative:${expectedSessionKey}`, + lightContext: true, deliver: false, }); expect(subagent.waitForRun).toHaveBeenCalledOnce(); @@ -655,12 +657,10 @@ describe("generateAndAppendDreamNarrative", () => { expect(exists).toBe(false); }); - it("waits once more before cleanup after timeout and logs cleanup failures", async () => { + it("skips extra settle waits after timeout and still attempts cleanup", async () => { const workspaceDir = await createTempWorkspace("openclaw-dreaming-narrative-"); const subagent = createMockSubagent(""); - subagent.waitForRun - .mockResolvedValueOnce({ status: "timeout" }) - .mockResolvedValueOnce({ status: "ok" }); + subagent.waitForRun.mockResolvedValueOnce({ status: "timeout" }); subagent.deleteSession.mockRejectedValue(new Error("still active")); const logger = createMockLogger(); @@ -671,8 +671,8 @@ describe("generateAndAppendDreamNarrative", () => { logger, }); - expect(subagent.waitForRun).toHaveBeenCalledTimes(2); - expect(subagent.waitForRun.mock.calls[1][0]).toMatchObject({ timeoutMs: 120_000 }); + expect(subagent.waitForRun).toHaveBeenCalledOnce(); + expect(subagent.waitForRun.mock.calls[0][0]).toMatchObject({ timeoutMs: 15_000 }); expect(logger.warn).toHaveBeenCalledWith( expect.stringContaining("narrative session cleanup failed for rem phase"), ); diff --git a/extensions/memory-core/src/dreaming-narrative.ts b/extensions/memory-core/src/dreaming-narrative.ts index c8470b23f7f..cdbafc8bd82 100644 --- a/extensions/memory-core/src/dreaming-narrative.ts +++ b/extensions/memory-core/src/dreaming-narrative.ts @@ -27,6 +27,8 @@ type SubagentSurface = { sessionKey: string; message: string; extraSystemPrompt?: string; + lane?: string; + lightContext?: boolean; deliver?: boolean; }) => Promise<{ runId: string }>; waitForRun: (params: { @@ -84,8 +86,10 @@ const NARRATIVE_SYSTEM_PROMPT = [ "- Output ONLY the diary entry. No preamble, no sign-off, no commentary.", ].join("\n"); -const NARRATIVE_TIMEOUT_MS = 60_000; -const NARRATIVE_DELETE_SETTLE_TIMEOUT_MS = 120_000; +// Narrative generation is best-effort. Keep the timeout short so a stalled +// diary subagent does not leave the parent dreaming cron job "running" for +// minutes after the reports have already been written. +const NARRATIVE_TIMEOUT_MS = 15_000; const DREAMING_SESSION_KEY_PREFIX = "dreaming-narrative-"; const DREAMING_TRANSCRIPT_RUN_MARKER = '"runId":"dreaming-narrative-'; const DREAMING_ORPHAN_MIN_AGE_MS = 300_000; @@ -151,6 +155,8 @@ async function startNarrativeRunOrFallback(params: { sessionKey: params.sessionKey, message: params.message, extraSystemPrompt: NARRATIVE_SYSTEM_PROMPT, + lane: `dreaming-narrative:${params.sessionKey}`, + lightContext: true, deliver: false, }); return run.runId; @@ -855,8 +861,6 @@ export async function generateAndAppendDreamNarrative(params: { }); const message = buildNarrativePrompt(params.data); let runId: string | null = null; - let waitStatus: string | null = null; - try { runId = await startNarrativeRunOrFallback({ subagent: params.subagent, @@ -876,7 +880,6 @@ export async function generateAndAppendDreamNarrative(params: { runId, timeoutMs: NARRATIVE_TIMEOUT_MS, }); - waitStatus = result.status; if (result.status !== "ok") { params.logger.warn( @@ -914,24 +917,6 @@ export async function generateAndAppendDreamNarrative(params: { `memory-core: narrative generation failed for ${params.data.phase} phase: ${formatErrorMessage(err)}`, ); } finally { - if (params.subagent && runId && waitStatus === "timeout") { - try { - const settle = await params.subagent.waitForRun({ - runId, - timeoutMs: NARRATIVE_DELETE_SETTLE_TIMEOUT_MS, - }); - if (settle.status !== "ok" && settle.status !== "error") { - params.logger.warn( - `memory-core: narrative cleanup wait ended with status=${settle.status} for ${params.data.phase} phase.`, - ); - } - } catch (cleanupWaitErr) { - params.logger.warn( - `memory-core: narrative cleanup wait failed for ${params.data.phase} phase: ${formatErrorMessage(cleanupWaitErr)}`, - ); - } - } - // Guard against subagent becoming unavailable mid-flight (throws TypeError without this). if (params.subagent) { try { diff --git a/extensions/memory-core/src/dreaming-phases.test.ts b/extensions/memory-core/src/dreaming-phases.test.ts index 498238aded6..8e70e176035 100644 --- a/extensions/memory-core/src/dreaming-phases.test.ts +++ b/extensions/memory-core/src/dreaming-phases.test.ts @@ -977,6 +977,400 @@ describe("memory-core dreaming phases", () => { ]); }); + it("skips isolated cron run transcripts during session ingestion", async () => { + const workspaceDir = await createDreamingWorkspace(); + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state")); + const sessionsDir = resolveSessionTranscriptsDirForAgent("main"); + await fs.mkdir(sessionsDir, { recursive: true }); + const transcriptPath = path.join(sessionsDir, "cron-run.jsonl"); + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-05T18:01:00.000Z", + content: + "[cron:job-1 Codex Sessions Sync] Run Codex sessions sync: 1. Convert sessions 2. Update qmd", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-05T18:02:00.000Z", + content: "Running Codex sessions sync...", + }, + }), + ].join("\n") + "\n", + "utf-8", + ); + await fs.writeFile( + path.join(sessionsDir, "sessions.json"), + JSON.stringify({ + "agent:main:cron:job-1:run:run-1": { + sessionId: "cron-run", + sessionFile: transcriptPath, + updatedAt: Date.now(), + }, + }), + "utf-8", + ); + + const { beforeAgentReply } = createHarness( + { + agents: { + defaults: { + workspace: workspaceDir, + }, + list: [{ id: "main", workspace: workspaceDir }], + }, + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + phases: { + light: { + enabled: true, + limit: 20, + lookbackDays: 7, + }, + }, + }, + }, + }, + }, + }, + }, + workspaceDir, + ); + + try { + await beforeAgentReply( + { cleanedBody: "__openclaw_memory_core_light_sleep__" }, + { trigger: "heartbeat", workspaceDir }, + ); + } finally { + vi.unstubAllEnvs(); + } + + await expect( + fs.access(path.join(workspaceDir, "memory", ".dreams", "session-corpus", "2026-04-05.txt")), + ).rejects.toMatchObject({ code: "ENOENT" }); + + const sessionIngestion = JSON.parse( + await fs.readFile( + path.join(workspaceDir, "memory", ".dreams", "session-ingestion.json"), + "utf-8", + ), + ) as { + files: Record< + string, + { + lineCount: number; + lastContentLine: number; + contentHash: string; + } + >; + }; + expect(Object.values(sessionIngestion.files)).toEqual([ + expect.objectContaining({ + lineCount: 0, + lastContentLine: 0, + contentHash: expect.any(String), + }), + ]); + }); + + it("drops generated system wrapper text without suppressing paired assistant replies", async () => { + const workspaceDir = await createDreamingWorkspace(); + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state")); + const sessionsDir = resolveSessionTranscriptsDirForAgent("main"); + await fs.mkdir(sessionsDir, { recursive: true }); + const transcriptPath = path.join(sessionsDir, "ordinary-session.jsonl"); + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:01:00.000Z", + content: + "System (untrusted): [2026-04-16 11:01:00 PDT] Exec completed (quiet-fo, code 0) :: Converted: 1", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-16T18:01:30.000Z", + content: "Handled internally.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:02:00.000Z", + content: "What changed in the sync?", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-16T18:03:00.000Z", + content: "One new session was converted.", + }, + }), + ].join("\n") + "\n", + "utf-8", + ); + + const { beforeAgentReply } = createHarness( + { + agents: { + defaults: { + workspace: workspaceDir, + }, + list: [{ id: "main", workspace: workspaceDir }], + }, + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + phases: { + light: { + enabled: true, + limit: 20, + lookbackDays: 7, + }, + }, + }, + }, + }, + }, + }, + }, + workspaceDir, + ); + + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-16T19:00:00.000Z")); + try { + await beforeAgentReply( + { cleanedBody: "__openclaw_memory_core_light_sleep__" }, + { trigger: "heartbeat", workspaceDir }, + ); + } finally { + vi.useRealTimers(); + vi.unstubAllEnvs(); + } + + const corpus = await fs.readFile( + path.join(workspaceDir, "memory", ".dreams", "session-corpus", "2026-04-16.txt"), + "utf-8", + ); + expect(corpus).toContain("User: What changed in the sync?"); + expect(corpus).toContain("Assistant: One new session was converted."); + expect(corpus).not.toContain("System (untrusted):"); + expect(corpus).toContain("Assistant: Handled internally."); + }); + + it("drops archive, cron, and heartbeat chatter from fresh session corpus output", async () => { + const workspaceDir = await createDreamingWorkspace(); + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state")); + const sessionsDir = resolveSessionTranscriptsDirForAgent("main"); + await fs.mkdir(sessionsDir, { recursive: true }); + + await fs.writeFile( + path.join(sessionsDir, "archived.jsonl.deleted.2026-04-16T18-06-16.529Z"), + [ + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:01:00.000Z", + content: "[cron:job-1 Example] Run the nightly sync", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-16T18:02:00.000Z", + content: "Running the nightly sync now.", + }, + }), + ].join("\n") + "\n", + "utf-8", + ); + await fs.writeFile( + path.join(sessionsDir, "ordinary.checkpoint.abc123.jsonl"), + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:03:00.000Z", + content: "Checkpoint chatter should stay out.", + }, + }) + "\n", + "utf-8", + ); + await fs.writeFile( + path.join(sessionsDir, "ordinary.jsonl"), + [ + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:04:00.000Z", + content: + "Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-16T18:05:00.000Z", + content: "HEARTBEAT_OK", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:06:00.000Z", + content: "[cron:job-2 Example] Run the qmd sync", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-16T18:07:00.000Z", + content: "Running the qmd sync now.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + timestamp: "2026-04-16T18:08:00.000Z", + content: "Document the Ollama provider setup.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + timestamp: "2026-04-16T18:09:00.000Z", + content: "I documented the Ollama provider setup in the workspace notes.", + }, + }), + ].join("\n") + "\n", + "utf-8", + ); + + const { beforeAgentReply } = createHarness( + { + agents: { + defaults: { + workspace: workspaceDir, + }, + list: [{ id: "main", workspace: workspaceDir }], + }, + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: true, + phases: { + light: { + enabled: true, + limit: 20, + lookbackDays: 7, + }, + }, + }, + }, + }, + }, + }, + }, + workspaceDir, + ); + + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-16T19:00:00.000Z")); + try { + await beforeAgentReply( + { cleanedBody: "__openclaw_memory_core_light_sleep__" }, + { trigger: "heartbeat", workspaceDir }, + ); + } finally { + vi.useRealTimers(); + vi.unstubAllEnvs(); + } + + const corpus = await fs.readFile( + path.join(workspaceDir, "memory", ".dreams", "session-corpus", "2026-04-16.txt"), + "utf-8", + ); + expect(corpus).toContain("User: Document the Ollama provider setup."); + expect(corpus).toContain( + "Assistant: I documented the Ollama provider setup in the workspace notes.", + ); + expect(corpus).not.toContain("Run the nightly sync"); + expect(corpus).not.toContain("Checkpoint chatter should stay out."); + expect(corpus).not.toContain("Read HEARTBEAT.md"); + expect(corpus).not.toContain("HEARTBEAT_OK"); + expect(corpus).not.toContain("Run the qmd sync"); + }); + + it("ignores chat scaffolding tags when building rem reflections", () => { + const preview = __testing.previewRemDreaming({ + entries: [ + { + key: "memory:1", + path: "memory/.dreams/session-corpus/2026-04-16.txt", + startLine: 1, + endLine: 1, + source: "memory", + snippet: "Assistant: I documented the Ollama provider setup.", + recallCount: 1, + dailyCount: 0, + groundedCount: 0, + totalScore: 0.6, + maxScore: 0.6, + firstRecalledAt: "2026-04-16T18:00:00.000Z", + lastRecalledAt: "2026-04-16T18:00:00.000Z", + queryHashes: ["q1"], + recallDays: ["2026-04-16"], + conceptTags: ["assistant", "the", "ollama", "provider"], + }, + ], + limit: 5, + minPatternStrength: 0, + }); + + expect(preview.reflections.join("\n")).toContain("`ollama`"); + expect(preview.reflections.join("\n")).toContain("`provider`"); + expect(preview.reflections.join("\n")).not.toContain("`assistant`"); + expect(preview.reflections.join("\n")).not.toContain("`the`"); + }); + it("does not reread unchanged dreaming-generated transcripts after checkpointing skip state", async () => { const workspaceDir = await createDreamingWorkspace(); vi.stubEnv("OPENCLAW_TEST_FAST", "1"); diff --git a/extensions/memory-core/src/dreaming-phases.ts b/extensions/memory-core/src/dreaming-phases.ts index f7979cbe656..099e7a12372 100644 --- a/extensions/memory-core/src/dreaming-phases.ts +++ b/extensions/memory-core/src/dreaming-phases.ts @@ -6,7 +6,7 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core"; import { buildSessionEntry, listSessionFilesForAgent, - loadDreamingNarrativeTranscriptPathSetForAgent, + loadSessionTranscriptClassificationForAgent, normalizeSessionTranscriptPathForComparison, parseUsageCountedSessionIdFromFileName, sessionPathForFile, @@ -191,6 +191,8 @@ type DailySnippetChunk = { snippet: string; }; +const REM_REFLECTION_TAG_BLACKLIST = new Set(["assistant", "user", "system", "subagent", "the"]); + function buildDailyChunkSnippet( heading: string | null, chunkLines: string[], @@ -710,21 +712,26 @@ async function collectSessionIngestionBatches(params: { agentId: string; absolutePath: string; generatedByDreamingNarrative: boolean; + generatedByCronRun: boolean; sessionPath: string; }> = []; for (const agentId of agentIds) { const files = await listSessionFilesForAgent(agentId); - const dreamingTranscriptPaths = + const transcriptClassification = files.length > 0 - ? loadDreamingNarrativeTranscriptPathSetForAgent(agentId) - : new Set(); + ? loadSessionTranscriptClassificationForAgent(agentId) + : { + dreamingNarrativeTranscriptPaths: new Set(), + cronRunTranscriptPaths: new Set(), + }; for (const absolutePath of files) { + const normalizedPath = normalizeSessionTranscriptPathForComparison(absolutePath); sessionFiles.push({ agentId, absolutePath, - generatedByDreamingNarrative: dreamingTranscriptPaths.has( - normalizeSessionTranscriptPathForComparison(absolutePath), - ), + generatedByDreamingNarrative: + transcriptClassification.dreamingNarrativeTranscriptPaths.has(normalizedPath), + generatedByCronRun: transcriptClassification.cronRunTranscriptPaths.has(normalizedPath), sessionPath: sessionPathForFile(absolutePath), }); } @@ -783,11 +790,12 @@ async function collectSessionIngestionBatches(params: { const entry = await buildSessionEntry(file.absolutePath, { generatedByDreamingNarrative: file.generatedByDreamingNarrative, + generatedByCronRun: file.generatedByCronRun, }); if (!entry) { continue; } - if (entry.generatedByDreamingNarrative) { + if (entry.generatedByDreamingNarrative || entry.generatedByCronRun) { nextFiles[stateKey] = { mtimeMs: fingerprint.mtimeMs, size: fingerprint.size, @@ -1414,7 +1422,7 @@ function buildRemReflections( const tagStats = new Map }>(); for (const entry of entries) { for (const tag of entry.conceptTags) { - if (!tag) { + if (!tag || REM_REFLECTION_TAG_BLACKLIST.has(tag.toLowerCase())) { continue; } const stat = tagStats.get(tag) ?? { count: 0, evidence: new Set() }; @@ -1493,6 +1501,7 @@ async function runLightDreaming(params: { config: LightDreamingConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; + detachNarratives?: boolean; nowMs?: number; }): Promise { const nowMs = Number.isFinite(params.nowMs) ? (params.nowMs as number) : Date.now(); @@ -1553,14 +1562,27 @@ async function runLightDreaming(params: { snippets: capped.map((e) => e.snippet).filter(Boolean), ...(themes.length > 0 ? { themes } : {}), }; - await generateAndAppendDreamNarrative({ - subagent: params.subagent, - workspaceDir: params.workspaceDir, - data, - nowMs, - timezone: params.config.timezone, - logger: params.logger, - }); + if (params.detachNarratives) { + queueMicrotask(() => { + void generateAndAppendDreamNarrative({ + subagent: params.subagent!, + workspaceDir: params.workspaceDir, + data, + nowMs, + timezone: params.config.timezone, + logger: params.logger, + }).catch(() => undefined); + }); + } else { + await generateAndAppendDreamNarrative({ + subagent: params.subagent, + workspaceDir: params.workspaceDir, + data, + nowMs, + timezone: params.config.timezone, + logger: params.logger, + }); + } } } @@ -1570,6 +1592,7 @@ async function runRemDreaming(params: { config: RemDreamingConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; + detachNarratives?: boolean; nowMs?: number; }): Promise { const nowMs = Number.isFinite(params.nowMs) ? (params.nowMs as number) : Date.now(); @@ -1632,14 +1655,27 @@ async function runRemDreaming(params: { .filter(Boolean), ...(themes.length > 0 ? { themes } : {}), }; - await generateAndAppendDreamNarrative({ - subagent: params.subagent, - workspaceDir: params.workspaceDir, - data, - nowMs, - timezone: params.config.timezone, - logger: params.logger, - }); + if (params.detachNarratives) { + queueMicrotask(() => { + void generateAndAppendDreamNarrative({ + subagent: params.subagent!, + workspaceDir: params.workspaceDir, + data, + nowMs, + timezone: params.config.timezone, + logger: params.logger, + }).catch(() => undefined); + }); + } else { + await generateAndAppendDreamNarrative({ + subagent: params.subagent, + workspaceDir: params.workspaceDir, + data, + nowMs, + timezone: params.config.timezone, + logger: params.logger, + }); + } } } @@ -1660,6 +1696,7 @@ export async function runDreamingSweepPhases(params: { cfg?: DreamingHostConfig; logger: Logger; subagent?: Parameters[0]["subagent"]; + detachNarratives?: boolean; nowMs?: number; }): Promise { // Normalize nowMs once so all phase timestamps and narrative session keys are consistent. @@ -1677,10 +1714,14 @@ export async function runDreamingSweepPhases(params: { logger: params.logger, subagent: params.subagent, nowMs: sweepNowMs, + detachNarratives: params.detachNarratives, }); // Defensive cleanup: ensure the light-phase narrative session is deleted even if // generateAndAppendDreamNarrative's primary cleanup was skipped due to an error. - if (params.subagent) { + // Skip when narratives are detached: the queued subagent run hasn't read the + // session yet, so eager cleanup would race the writer and silently drop the + // diary entry. The narrative function does its own cleanup in finally{}. + if (params.subagent && !params.detachNarratives) { const lightSessionKey = buildNarrativeSessionKey({ workspaceDir: params.workspaceDir, phase: "light", @@ -1702,9 +1743,11 @@ export async function runDreamingSweepPhases(params: { logger: params.logger, subagent: params.subagent, nowMs: sweepNowMs, + detachNarratives: params.detachNarratives, }); // Defensive cleanup: ensure the REM-phase narrative session is deleted. - if (params.subagent) { + // Skip when narratives are detached (see light-phase comment above). + if (params.subagent && !params.detachNarratives) { const remSessionKey = buildNarrativeSessionKey({ workspaceDir: params.workspaceDir, phase: "rem", @@ -1777,6 +1820,7 @@ export function registerMemoryDreamingPhases(_api: OpenClawPluginApi): void { export const __testing = { runPhaseIfTriggered, + previewRemDreaming, constants: { LIGHT_SLEEP_EVENT_TEXT, REM_SLEEP_EVENT_TEXT, diff --git a/extensions/memory-core/src/dreaming-shared.test.ts b/extensions/memory-core/src/dreaming-shared.test.ts new file mode 100644 index 00000000000..c3df7f55225 --- /dev/null +++ b/extensions/memory-core/src/dreaming-shared.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; +import { includesSystemEventToken } from "./dreaming-shared.js"; + +const TOKEN = "__openclaw_memory_core_short_term_promotion_dream__"; + +describe("includesSystemEventToken", () => { + it("matches the bare token", () => { + expect(includesSystemEventToken(TOKEN, TOKEN)).toBe(true); + }); + + it("matches a token wrapped by an isolated-cron `[cron:]` prefix", () => { + expect(includesSystemEventToken(`[cron:abc-123] ${TOKEN}`, TOKEN)).toBe(true); + }); + + it("matches the token on its own line within multiline content", () => { + expect(includesSystemEventToken(`leading text\n${TOKEN}\ntrailing`, TOKEN)).toBe(true); + }); + + it("does NOT match a user message that merely embeds the token mid-sentence", () => { + expect( + includesSystemEventToken(`please tell me about ${TOKEN} when you have time`, TOKEN), + ).toBe(false); + }); + + it("does NOT match a user message with the token in a code-fence-style block", () => { + expect( + includesSystemEventToken(`here is a snippet:\n\`${TOKEN}\`\nwhat does that do?`, TOKEN), + ).toBe(false); + }); + + it("does NOT match an arbitrary wrapper the runtime does not produce", () => { + expect(includesSystemEventToken(`[somewrap] ${TOKEN}`, TOKEN)).toBe(false); + }); + + it("returns false for empty inputs", () => { + expect(includesSystemEventToken("", TOKEN)).toBe(false); + expect(includesSystemEventToken(TOKEN, "")).toBe(false); + expect(includesSystemEventToken(" ", TOKEN)).toBe(false); + }); +}); diff --git a/extensions/memory-core/src/dreaming-shared.ts b/extensions/memory-core/src/dreaming-shared.ts index 417690a7d93..427696ec94d 100644 --- a/extensions/memory-core/src/dreaming-shared.ts +++ b/extensions/memory-core/src/dreaming-shared.ts @@ -18,5 +18,15 @@ export function includesSystemEventToken(cleanedBody: string, eventText: string) if (normalizedBody === normalizedEventText) { return true; } - return normalizedBody.split(/\r?\n/).some((line) => line.trim() === normalizedEventText); + return normalizedBody.split(/\r?\n/).some((line) => { + const trimmed = line.trim(); + if (trimmed === normalizedEventText) { + return true; + } + // Isolated cron turns wrap the payload with a `[cron:] ...` prefix; strip + // that one known wrapper before matching so the dream sentinel still triggers + // without falling back to a broad substring match (which would let any user + // message embedding the token surface as a dream cron firing). + return trimmed.replace(/^\[cron:[^\]]+\]\s*/, "") === normalizedEventText; + }); } diff --git a/extensions/memory-core/src/dreaming.test.ts b/extensions/memory-core/src/dreaming.test.ts index 6cf7611737e..2f23f081d4d 100644 --- a/extensions/memory-core/src/dreaming.test.ts +++ b/extensions/memory-core/src/dreaming.test.ts @@ -71,6 +71,7 @@ function createCronHarness( ...job, ...(job.schedule ? { schedule: { ...job.schedule } } : {}), ...(job.payload ? { payload: { ...job.payload } } : {}), + ...(job.delivery ? { delivery: { ...job.delivery } } : {}), })); }, async add(input) { @@ -84,6 +85,7 @@ function createCronHarness( sessionTarget: input.sessionTarget, wakeMode: input.wakeMode, payload: { ...input.payload }, + ...(input.delivery ? { delivery: { ...input.delivery } } : {}), createdAtMs: Date.now(), }); return {}; @@ -104,6 +106,7 @@ function createCronHarness( ...(patch.sessionTarget ? { sessionTarget: patch.sessionTarget } : {}), ...(patch.wakeMode ? { wakeMode: patch.wakeMode } : {}), ...(patch.payload ? { payload: { ...patch.payload } } : {}), + ...(patch.delivery ? { delivery: { ...patch.delivery } } : {}), }; return {}; }, @@ -432,11 +435,15 @@ describe("short-term dreaming cron reconciliation", () => { expect(harness.addCalls).toHaveLength(1); expect(harness.addCalls[0]).toMatchObject({ name: constants.MANAGED_DREAMING_CRON_NAME, - sessionTarget: "main", + sessionTarget: "isolated", wakeMode: "now", + delivery: { + mode: "none", + }, payload: { - kind: "systemEvent", - text: constants.DREAMING_SYSTEM_EVENT_TEXT, + kind: "agentTurn", + message: constants.DREAMING_SYSTEM_EVENT_TEXT, + lightContext: true, }, schedule: { kind: "cron", @@ -471,6 +478,9 @@ describe("short-term dreaming cron reconciliation", () => { kind: "systemEvent", text: "stale-text", }, + delivery: { + mode: "announce", + }, createdAtMs: 1, }; const duplicate: CronJobLike = { @@ -506,8 +516,12 @@ describe("short-term dreaming cron reconciliation", () => { id: "job-primary", patch: { enabled: true, + sessionTarget: "isolated", wakeMode: "now", schedule: desired.schedule, + delivery: { + mode: "none", + }, payload: desired.payload, }, }); @@ -770,6 +784,9 @@ describe("gateway startup reconciliation", () => { expr: "15 4 * * *", tz: "UTC", }, + delivery: { + mode: "none", + }, }); expect(logger.info).toHaveBeenCalledWith( expect.stringContaining("created managed dreaming cron job"), @@ -1538,6 +1555,53 @@ describe("gateway startup reconciliation", () => { clearInternalHooks(); } }); + + it("handles managed dreaming cron triggers without a queued heartbeat event", async () => { + clearInternalHooks(); + const logger = createLogger(); + const harness = createCronHarness(); + const onMock = vi.fn(); + const api: DreamingPluginApiTestDouble = { + config: { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: false, + }, + }, + }, + }, + }, + } as OpenClawConfig, + pluginConfig: {}, + logger, + runtime: {}, + on: onMock, + }; + + try { + registerShortTermPromotionDreamingForTest(api); + await triggerGatewayStart(onMock, { + config: api.config, + getCron: () => harness.cron, + }); + + const beforeAgentReply = getBeforeAgentReplyHandler(onMock); + const result = await beforeAgentReply( + { cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT }, + { trigger: "cron", workspaceDir: ".", sessionKey: "cron:memory-dreaming" }, + ); + + expect(result).toEqual({ + handled: true, + reason: "memory-core: short-term dreaming disabled", + }); + } finally { + clearInternalHooks(); + } + }); }); describe("short-term dreaming trigger", () => { @@ -1635,6 +1699,51 @@ describe("short-term dreaming trigger", () => { expect(memoryText).toContain("Move backups to S3 Glacier."); }); + it("applies promotions when the managed dreaming token is wrapped by the cron label", async () => { + const logger = createLogger(); + const workspaceDir = await createTempWorkspace("memory-dreaming-cron-wrapper-"); + await writeDailyMemoryNote(workspaceDir, "2026-04-02", ["Move backups to S3 Glacier."]); + + await recordShortTermRecalls({ + workspaceDir, + query: "backup policy", + results: [ + { + path: "memory/2026-04-02.md", + startLine: 1, + endLine: 1, + score: 0.9, + snippet: "Move backups to S3 Glacier.", + source: "memory", + }, + ], + }); + + const result = await runShortTermDreamingPromotionIfTriggered({ + cleanedBody: [ + "[cron:e795558c-a273-4124-ba88-d4916688d977 Memory Dreaming Promotion] __openclaw_memory_core_short_term_promotion_dream__", + "Current time: Thursday, April 16th, 2026 - 3:10 PM (America/Los_Angeles) / 2026-04-16 22:10 UTC", + ].join("\n"), + trigger: "cron", + workspaceDir, + config: { + enabled: true, + cron: constants.DEFAULT_DREAMING_CRON_EXPR, + limit: 10, + minScore: 0, + minRecallCount: 0, + minUniqueQueries: 0, + recencyHalfLifeDays: constants.DEFAULT_DREAMING_RECENCY_HALF_LIFE_DAYS, + verboseLogging: false, + }, + logger, + }); + + expect(result?.handled).toBe(true); + const memoryText = await fs.readFile(path.join(workspaceDir, "MEMORY.md"), "utf-8"); + expect(memoryText).toContain("Move backups to S3 Glacier."); + }); + it("keeps one-off recalls out of long-term memory under default thresholds", async () => { const logger = createLogger(); const workspaceDir = await createTempWorkspace("memory-dreaming-strict-"); @@ -1687,7 +1796,7 @@ describe("short-term dreaming trigger", () => { expect(memoryText).toBe(""); }); - it("ignores non-heartbeat triggers", async () => { + it("ignores non-cron, non-heartbeat triggers", async () => { const logger = createLogger(); const result = await runShortTermDreamingPromotionIfTriggered({ cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT, @@ -1708,6 +1817,108 @@ describe("short-term dreaming trigger", () => { expect(result).toBeUndefined(); }); + it("applies promotions when the managed dreaming isolated cron job fires", async () => { + const logger = createLogger(); + const workspaceDir = await createTempWorkspace("memory-dreaming-cron-"); + await writeDailyMemoryNote(workspaceDir, "2026-04-02", ["Move backups to S3 Glacier."]); + + await recordShortTermRecalls({ + workspaceDir, + query: "backup policy", + results: [ + { + path: "memory/2026-04-02.md", + startLine: 1, + endLine: 1, + score: 0.9, + snippet: "Move backups to S3 Glacier.", + source: "memory", + }, + ], + }); + + const result = await runShortTermDreamingPromotionIfTriggered({ + cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT, + trigger: "cron", + workspaceDir, + config: { + enabled: true, + cron: constants.DEFAULT_DREAMING_CRON_EXPR, + limit: 10, + minScore: 0, + minRecallCount: 0, + minUniqueQueries: 0, + recencyHalfLifeDays: constants.DEFAULT_DREAMING_RECENCY_HALF_LIFE_DAYS, + verboseLogging: false, + }, + logger, + }); + + expect(result?.handled).toBe(true); + const memoryText = await fs.readFile(path.join(workspaceDir, "MEMORY.md"), "utf-8"); + expect(memoryText).toContain("Move backups to S3 Glacier."); + }); + + it("writes dream diary prose for managed cron dreaming", async () => { + const logger = createLogger(); + const workspaceDir = await createTempWorkspace("memory-dreaming-cron-no-narrative-"); + await writeDailyMemoryNote(workspaceDir, "2026-04-02", ["Move backups to S3 Glacier."]); + + await recordShortTermRecalls({ + workspaceDir, + query: "backup policy", + results: [ + { + path: "memory/2026-04-02.md", + startLine: 1, + endLine: 1, + score: 0.9, + snippet: "Move backups to S3 Glacier.", + source: "memory", + }, + ], + }); + + const subagent = { + run: vi.fn(async () => ({ runId: "narrative-run-1" })), + waitForRun: vi.fn(async () => ({ status: "ok" })), + getSessionMessages: vi.fn(async () => ({ + messages: [{ role: "assistant", content: "A diary entry." }], + })), + deleteSession: vi.fn(async () => {}), + }; + + const result = await runShortTermDreamingPromotionIfTriggered({ + cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT, + trigger: "cron", + workspaceDir, + config: { + enabled: true, + cron: constants.DEFAULT_DREAMING_CRON_EXPR, + limit: 10, + minScore: 0, + minRecallCount: 0, + minUniqueQueries: 0, + recencyHalfLifeDays: constants.DEFAULT_DREAMING_RECENCY_HALF_LIFE_DAYS, + verboseLogging: false, + }, + logger, + subagent, + }); + + expect(result?.handled).toBe(true); + expect(subagent.run).toHaveBeenCalled(); + const memoryText = await fs.readFile(path.join(workspaceDir, "MEMORY.md"), "utf-8"); + expect(memoryText).toContain("Move backups to S3 Glacier."); + await vi.waitFor(async () => { + expect(subagent.waitForRun).toHaveBeenCalled(); + expect(subagent.getSessionMessages).toHaveBeenCalled(); + expect(subagent.deleteSession).toHaveBeenCalled(); + const dreamsText = await fs.readFile(path.join(workspaceDir, "DREAMS.md"), "utf-8"); + expect(dreamsText).toContain("A diary entry."); + }); + }); + it("skips dreaming promotion cleanly when limit is zero", async () => { const logger = createLogger(); const workspaceDir = await createTempWorkspace("memory-dreaming-limit-zero-"); diff --git a/extensions/memory-core/src/dreaming.ts b/extensions/memory-core/src/dreaming.ts index 874681c02cd..93ccba334dc 100644 --- a/extensions/memory-core/src/dreaming.ts +++ b/extensions/memory-core/src/dreaming.ts @@ -1,9 +1,4 @@ -import { resolveDefaultAgentId } from "openclaw/plugin-sdk/config-runtime"; -import { - isHeartbeatEnabledForAgent, - peekSystemEventEntries, - resolveHeartbeatIntervalMs, -} from "openclaw/plugin-sdk/infra-runtime"; +import { peekSystemEventEntries } from "openclaw/plugin-sdk/infra-runtime"; import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core"; import { DEFAULT_MEMORY_DREAMING_FREQUENCY as DEFAULT_MEMORY_DREAMING_CRON_EXPR, @@ -34,7 +29,6 @@ import { const MANAGED_DREAMING_CRON_NAME = "Memory Dreaming Promotion"; const MANAGED_DREAMING_CRON_TAG = "[managed-by=memory-core.short-term-promotion]"; const DREAMING_SYSTEM_EVENT_TEXT = "__openclaw_memory_core_short_term_promotion_dream__"; -const CRON_SESSION_TARGET_MAIN = "main" as const; const LEGACY_LIGHT_SLEEP_CRON_NAME = "Memory Light Dreaming"; const LEGACY_LIGHT_SLEEP_CRON_TAG = "[managed-by=memory-core.dreaming.light]"; const LEGACY_LIGHT_SLEEP_EVENT_TEXT = "__openclaw_memory_core_light_sleep__"; @@ -47,15 +41,20 @@ const HEARTBEAT_ISOLATED_SESSION_SUFFIX = ":heartbeat"; type Logger = Pick; type CronSchedule = { kind: "cron"; expr: string; tz?: string }; -type CronPayload = { kind: "systemEvent"; text: string }; +type CronPayload = + | { kind: "systemEvent"; text: string } + | { kind: "agentTurn"; message: string; lightContext?: boolean }; type ManagedCronJobCreate = { name: string; description: string; enabled: boolean; schedule: CronSchedule; - sessionTarget: typeof CRON_SESSION_TARGET_MAIN; + sessionTarget: "main" | "isolated"; wakeMode: "now"; payload: CronPayload; + delivery?: { + mode: "none"; + }; }; type ManagedCronJobPatch = { @@ -63,9 +62,12 @@ type ManagedCronJobPatch = { description?: string; enabled?: boolean; schedule?: CronSchedule; - sessionTarget?: typeof CRON_SESSION_TARGET_MAIN; + sessionTarget?: "main" | "isolated"; wakeMode?: "now"; payload?: CronPayload; + delivery?: { + mode: "none"; + }; }; type ManagedCronJobLike = { @@ -83,6 +85,11 @@ type ManagedCronJobLike = { payload?: { kind?: string; text?: string; + message?: string; + lightContext?: boolean; + }; + delivery?: { + mode?: string; }; createdAtMs?: number; }; @@ -155,23 +162,41 @@ function buildManagedDreamingCronJob( expr: config.cron, ...(config.timezone ? { tz: config.timezone } : {}), }, - sessionTarget: CRON_SESSION_TARGET_MAIN, + sessionTarget: "isolated", wakeMode: "now", payload: { - kind: "systemEvent", - text: DREAMING_SYSTEM_EVENT_TEXT, + kind: "agentTurn", + message: DREAMING_SYSTEM_EVENT_TEXT, + lightContext: true, + }, + // Dreaming is a maintenance sweep, not a user-facing announce job. + delivery: { + mode: "none", }, }; } +function resolveManagedDreamingPayloadToken( + payload: ManagedCronJobLike["payload"], +): string | undefined { + const payloadKind = normalizeLowercaseStringOrEmpty(normalizeTrimmedString(payload?.kind)); + if (payloadKind === "systemevent") { + return normalizeTrimmedString(payload?.text); + } + if (payloadKind === "agentturn") { + return normalizeTrimmedString(payload?.message); + } + return undefined; +} + function isManagedDreamingJob(job: ManagedCronJobLike): boolean { const description = normalizeTrimmedString(job.description); if (description?.includes(MANAGED_DREAMING_CRON_TAG)) { return true; } const name = normalizeTrimmedString(job.name); - const payloadText = normalizeTrimmedString(job.payload?.text); - return name === MANAGED_DREAMING_CRON_NAME && payloadText === DREAMING_SYSTEM_EVENT_TEXT; + const payloadToken = resolveManagedDreamingPayloadToken(job.payload); + return name === MANAGED_DREAMING_CRON_NAME && payloadToken === DREAMING_SYSTEM_EVENT_TEXT; } function isLegacyPhaseDreamingJob(job: ManagedCronJobLike): boolean { @@ -255,8 +280,8 @@ function buildManagedDreamingPatch( } const sessionTarget = normalizeLowercaseStringOrEmpty(normalizeTrimmedString(job.sessionTarget)); - if (sessionTarget !== "main") { - patch.sessionTarget = "main"; + if (sessionTarget !== desired.sessionTarget) { + patch.sessionTarget = desired.sessionTarget; } const wakeMode = normalizeLowercaseStringOrEmpty(normalizeTrimmedString(job.wakeMode)); if (wakeMode !== "now") { @@ -264,10 +289,21 @@ function buildManagedDreamingPatch( } const payloadKind = normalizeLowercaseStringOrEmpty(normalizeTrimmedString(job.payload?.kind)); - const payloadText = normalizeTrimmedString(job.payload?.text); - if (payloadKind !== "systemevent" || !compareOptionalStrings(payloadText, desired.payload.text)) { + const payloadToken = resolveManagedDreamingPayloadToken(job.payload); + const desiredPayloadToken = + desired.payload.kind === "systemEvent" ? desired.payload.text : desired.payload.message; + const payloadNeedsUpdate = + payloadKind !== normalizeLowercaseStringOrEmpty(desired.payload.kind) || + !compareOptionalStrings(payloadToken, desiredPayloadToken) || + (desired.payload.kind === "agentTurn" && + job.payload?.lightContext !== desired.payload.lightContext); + if (payloadNeedsUpdate) { patch.payload = desired.payload; } + const deliveryMode = normalizeLowercaseStringOrEmpty(normalizeTrimmedString(job.delivery?.mode)); + if (deliveryMode !== "none") { + patch.delivery = desired.delivery; + } return Object.keys(patch).length > 0 ? patch : null; } @@ -358,27 +394,6 @@ export function resolveShortTermPromotionDreamingConfig(params: { }; } -export function resolveDreamingBlockedReason(cfg: OpenClawConfig): string | null { - const pluginConfig = resolveMemoryCorePluginConfig(cfg); - const dreaming = resolveShortTermPromotionDreamingConfig({ pluginConfig, cfg }); - if (!dreaming.enabled) { - return null; - } - - const defaultAgentId = resolveDefaultAgentId(cfg); - // Mirror the managed dreaming wake path in server-cron: the job carries no - // agentId/sessionKey, so the wake uses defaults-only heartbeat. Not using - // resolveHeartbeatSummaryForAgent since it would apply the per-agent override - // and diverge from actual runtime behavior. - const enabledForDefault = isHeartbeatEnabledForAgent(cfg, defaultAgentId); - const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, cfg.agents?.defaults?.heartbeat); - if (enabledForDefault && intervalMs != null) { - return null; - } - - return `dreaming is enabled but will not run because heartbeat is disabled for "${defaultAgentId}". See https://docs.openclaw.ai/concepts/dreaming#troubleshooting`; -} - export async function reconcileShortTermDreamingCronJob(params: { cron: CronServiceLike | null; config: ShortTermPromotionDreamingConfig; @@ -473,7 +488,7 @@ export async function runShortTermDreamingPromotionIfTriggered(params: { logger: Logger; subagent?: Parameters[0]["subagent"]; }): Promise<{ handled: true; reason: string } | undefined> { - if (params.trigger !== "heartbeat") { + if (params.trigger !== "heartbeat" && params.trigger !== "cron") { return undefined; } if (!includesSystemEventToken(params.cleanedBody, DREAMING_SYSTEM_EVENT_TEXT)) { @@ -521,6 +536,7 @@ export async function runShortTermDreamingPromotionIfTriggered(params: { let totalApplied = 0; let failedWorkspaces = 0; const pluginConfig = params.cfg ? resolveMemoryCorePluginConfig(params.cfg) : undefined; + const detachNarratives = params.trigger === "cron"; for (const workspaceDir of workspaces) { try { const sweepNowMs = Date.now(); @@ -530,6 +546,7 @@ export async function runShortTermDreamingPromotionIfTriggered(params: { cfg: params.cfg, logger: params.logger, subagent: params.subagent, + detachNarratives, nowMs: sweepNowMs, }); @@ -608,14 +625,27 @@ export async function runShortTermDreamingPromotionIfTriggered(params: { snippets: candidates.map((c) => c.snippet).filter(Boolean), promotions: applied.appliedCandidates.map((c) => c.snippet).filter(Boolean), }; - await generateAndAppendDreamNarrative({ - subagent: params.subagent, - workspaceDir, - data, - nowMs: sweepNowMs, - timezone: params.config.timezone, - logger: params.logger, - }); + if (detachNarratives) { + queueMicrotask(() => { + void generateAndAppendDreamNarrative({ + subagent: params.subagent!, + workspaceDir, + data, + nowMs: sweepNowMs, + timezone: params.config.timezone, + logger: params.logger, + }).catch(() => undefined); + }); + } else { + await generateAndAppendDreamNarrative({ + subagent: params.subagent, + workspaceDir, + data, + nowMs: sweepNowMs, + timezone: params.config.timezone, + logger: params.logger, + }); + } } } catch (err) { failedWorkspaces += 1; @@ -736,17 +766,21 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void api.on("before_agent_reply", async (event, ctx) => { try { - if (ctx.trigger !== "heartbeat") { + if (ctx.trigger !== "heartbeat" && ctx.trigger !== "cron") { return undefined; } const currentConfig = resolveCurrentConfig(); const config = await reconcileManagedDreamingCron({ reason: "runtime", }); - if ( - !hasPendingManagedDreamingCronEvent(ctx.sessionKey) || - !includesSystemEventToken(event.cleanedBody, DREAMING_SYSTEM_EVENT_TEXT) - ) { + const hasManagedDreamingToken = includesSystemEventToken( + event.cleanedBody, + DREAMING_SYSTEM_EVENT_TEXT, + ); + const isManagedHeartbeatTrigger = + ctx.trigger === "heartbeat" && hasPendingManagedDreamingCronEvent(ctx.sessionKey); + const isManagedCronTrigger = ctx.trigger === "cron"; + if (!hasManagedDreamingToken || (!isManagedHeartbeatTrigger && !isManagedCronTrigger)) { return undefined; } return await runShortTermDreamingPromotionIfTriggered({ diff --git a/src/agents/pi-embedded-runner/run.before-agent-reply-cron.test.ts b/src/agents/pi-embedded-runner/run.before-agent-reply-cron.test.ts new file mode 100644 index 00000000000..f2379a28faa --- /dev/null +++ b/src/agents/pi-embedded-runner/run.before-agent-reply-cron.test.ts @@ -0,0 +1,83 @@ +import { beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; +import { makeAttemptResult } from "./run.overflow-compaction.fixture.js"; +import { + loadRunOverflowCompactionHarness, + mockedGlobalHookRunner, + mockedRunEmbeddedAttempt, + overflowBaseRunParams, + resetRunOverflowCompactionHarnessMocks, +} from "./run.overflow-compaction.harness.js"; + +let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent; + +describe("runEmbeddedPiAgent cron before_agent_reply seam", () => { + beforeAll(async () => { + ({ runEmbeddedPiAgent } = await loadRunOverflowCompactionHarness()); + }); + + beforeEach(() => { + resetRunOverflowCompactionHarnessMocks(); + }); + + it("lets before_agent_reply claim cron runs before the embedded attempt starts", async () => { + mockedGlobalHookRunner.hasHooks.mockImplementation( + (hookName: string) => hookName === "before_agent_reply", + ); + mockedGlobalHookRunner.runBeforeAgentReply.mockResolvedValue({ + handled: true, + reply: { text: "dreaming claimed" }, + }); + + const result = await runEmbeddedPiAgent({ + ...overflowBaseRunParams, + trigger: "cron", + prompt: "__openclaw_memory_core_short_term_promotion_dream__", + }); + + expect(mockedGlobalHookRunner.runBeforeAgentReply).toHaveBeenCalledWith( + { cleanedBody: "__openclaw_memory_core_short_term_promotion_dream__" }, + expect.objectContaining({ + agentId: "main", + sessionId: "test-session", + sessionKey: "test-key", + workspaceDir: "/tmp/workspace", + trigger: "cron", + }), + ); + expect(mockedRunEmbeddedAttempt).not.toHaveBeenCalled(); + expect(result.payloads?.[0]?.text).toBe("dreaming claimed"); + }); + + it("returns a silent payload when a cron hook claims without a reply body", async () => { + mockedGlobalHookRunner.hasHooks.mockImplementation( + (hookName: string) => hookName === "before_agent_reply", + ); + mockedGlobalHookRunner.runBeforeAgentReply.mockResolvedValue({ + handled: true, + }); + + const result = await runEmbeddedPiAgent({ + ...overflowBaseRunParams, + trigger: "cron", + }); + + expect(mockedRunEmbeddedAttempt).not.toHaveBeenCalled(); + expect(result.payloads?.[0]?.text).toBe(SILENT_REPLY_TOKEN); + }); + + it("does not invoke before_agent_reply for non-cron embedded runs", async () => { + mockedGlobalHookRunner.hasHooks.mockImplementation( + (hookName: string) => hookName === "before_agent_reply", + ); + mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + + await runEmbeddedPiAgent({ + ...overflowBaseRunParams, + trigger: "user", + }); + + expect(mockedGlobalHookRunner.runBeforeAgentReply).not.toHaveBeenCalled(); + expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts index 94a21857ee9..7faf8c63f71 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts @@ -3,6 +3,7 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js"; import { formatErrorMessage } from "../../infra/errors.js"; import type { PluginHookAgentContext, + PluginHookBeforeAgentReplyResult, PluginHookBeforeAgentStartResult, PluginHookBeforeModelResolveResult, PluginHookBeforePromptBuildResult, @@ -32,6 +33,12 @@ type MockCompactionResult = export const mockedGlobalHookRunner = { hasHooks: vi.fn((_hookName: string) => false), + runBeforeAgentReply: vi.fn( + async ( + _event: { cleanedBody: string }, + _ctx: PluginHookAgentContext, + ): Promise => undefined, + ), runBeforeAgentStart: vi.fn( async ( _event: { prompt: string; messages?: unknown[] }, @@ -202,6 +209,8 @@ export const overflowBaseRunParams = { export function resetRunOverflowCompactionHarnessMocks(): void { mockedGlobalHookRunner.hasHooks.mockReset(); mockedGlobalHookRunner.hasHooks.mockReturnValue(false); + mockedGlobalHookRunner.runBeforeAgentReply.mockReset(); + mockedGlobalHookRunner.runBeforeAgentReply.mockResolvedValue(undefined); mockedGlobalHookRunner.runBeforeAgentStart.mockReset(); mockedGlobalHookRunner.runBeforeAgentStart.mockResolvedValue(undefined); mockedGlobalHookRunner.runBeforePromptBuild.mockReset(); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index c4671986e6a..38200f9a283 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1,6 +1,8 @@ import { randomBytes } from "node:crypto"; import fs from "node:fs/promises"; +import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; +import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; import { resolveContextEngine } from "../../context-engine/registry.js"; import { emitAgentPlanEvent } from "../../infra/agent-events.js"; @@ -213,6 +215,21 @@ function backfillSessionKey(params: { } } +function buildHandledReplyPayloads(reply?: ReplyPayload) { + const normalized = reply ?? { text: SILENT_REPLY_TOKEN }; + return [ + { + text: normalized.text, + mediaUrl: normalized.mediaUrl, + mediaUrls: normalized.mediaUrls, + replyToId: normalized.replyToId, + audioAsVoice: normalized.audioAsVoice, + isError: normalized.isError, + isReasoning: normalized.isReasoning, + }, + ]; +} + export async function runEmbeddedPiAgent( params: RunEmbeddedPiAgentParams, ): Promise { @@ -314,6 +331,27 @@ export async function runEmbeddedPiAgent( trigger: params.trigger, channelId: params.messageChannel ?? params.messageProvider ?? undefined, }; + if (params.trigger === "cron" && hookRunner?.hasHooks("before_agent_reply")) { + const hookResult = await hookRunner.runBeforeAgentReply( + { cleanedBody: params.prompt }, + hookCtx, + ); + if (hookResult?.handled) { + return { + payloads: buildHandledReplyPayloads(hookResult.reply), + meta: { + durationMs: Date.now() - started, + agentMeta: { + sessionId: params.sessionId, + provider, + model: modelId, + }, + finalAssistantVisibleText: hookResult.reply?.text ?? SILENT_REPLY_TOKEN, + finalAssistantRawText: hookResult.reply?.text ?? SILENT_REPLY_TOKEN, + }, + }; + } + } const hookSelection = await resolveHookModelSelection({ prompt: params.prompt, diff --git a/src/commands/doctor-cron-dreaming-payload-migration.constants-drift.test.ts b/src/commands/doctor-cron-dreaming-payload-migration.constants-drift.test.ts new file mode 100644 index 00000000000..820460bb5b0 --- /dev/null +++ b/src/commands/doctor-cron-dreaming-payload-migration.constants-drift.test.ts @@ -0,0 +1,53 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; + +// Mirrored constants in src/commands/doctor-cron-dreaming-payload-migration.ts +// must match the source-of-truth values in +// extensions/memory-core/src/dreaming.ts. There is no shared module today +// because src/ does not import from extensions/, so this drift check stands +// in for that boundary: rename either side without updating the other and +// this test fails before the doctor migration silently stops matching jobs. +const MIRROR_PATH = path.resolve(__dirname, "doctor-cron-dreaming-payload-migration.ts"); +const SOURCE_PATH = path.resolve( + __dirname, + "..", + "..", + "extensions", + "memory-core", + "src", + "dreaming.ts", +); + +const NAMES = [ + "MANAGED_DREAMING_CRON_NAME", + "MANAGED_DREAMING_CRON_TAG", + "DREAMING_SYSTEM_EVENT_TEXT", +] as const; + +function extractStringConst(source: string, name: string): string { + const re = new RegExp(`\\bconst ${name}\\b\\s*=\\s*(['"\`])([^'"\`]*)\\1`); + const match = source.match(re); + if (!match || typeof match[2] !== "string") { + throw new Error(`could not find string const ${name}`); + } + return match[2]; +} + +describe("dreaming payload-migration constants drift", () => { + it("matches the source-of-truth values from extensions/memory-core/src/dreaming.ts", async () => { + const [mirror, source] = await Promise.all([ + fs.readFile(MIRROR_PATH, "utf-8"), + fs.readFile(SOURCE_PATH, "utf-8"), + ]); + + for (const name of NAMES) { + const mirrorValue = extractStringConst(mirror, name); + const sourceValue = extractStringConst(source, name); + expect( + mirrorValue, + `${name} drift: mirror in src/commands does not match extensions/memory-core/src/dreaming.ts`, + ).toBe(sourceValue); + } + }); +}); diff --git a/src/commands/doctor-cron-dreaming-payload-migration.test.ts b/src/commands/doctor-cron-dreaming-payload-migration.test.ts new file mode 100644 index 00000000000..9f2f3755ed9 --- /dev/null +++ b/src/commands/doctor-cron-dreaming-payload-migration.test.ts @@ -0,0 +1,177 @@ +import { describe, expect, it } from "vitest"; +import { + countStaleDreamingJobs, + migrateLegacyDreamingPayloadShape, +} from "./doctor-cron-dreaming-payload-migration.js"; + +const DREAMING_TOKEN = "__openclaw_memory_core_short_term_promotion_dream__"; +const DREAMING_TAG = "[managed-by=memory-core.short-term-promotion]"; + +function staleDreamingJob() { + return { + id: "job-1", + name: "Memory Dreaming Promotion", + description: `${DREAMING_TAG} Promote weighted short-term recalls.`, + enabled: true, + schedule: { kind: "cron", expr: "0 3 * * *" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: DREAMING_TOKEN }, + } as Record; +} + +function migratedDreamingJob() { + return { + id: "job-1", + name: "Memory Dreaming Promotion", + description: `${DREAMING_TAG} Promote weighted short-term recalls.`, + enabled: true, + schedule: { kind: "cron", expr: "0 3 * * *" }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: DREAMING_TOKEN, lightContext: true }, + delivery: { mode: "none" }, + } as Record; +} + +describe("migrateLegacyDreamingPayloadShape", () => { + it("rewrites stale main-session dreaming jobs to isolated agentTurn shape", () => { + const jobs = [staleDreamingJob()]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result).toEqual({ changed: true, rewrittenCount: 1 }); + expect(jobs[0]).toMatchObject({ + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: DREAMING_TOKEN, lightContext: true }, + delivery: { mode: "none" }, + }); + }); + + it("identifies the managed job by description tag even when name was edited", () => { + const jobs = [{ ...staleDreamingJob(), name: "Custom Name" }]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result.rewrittenCount).toBe(1); + expect(jobs[0]).toMatchObject({ sessionTarget: "isolated" }); + }); + + it("identifies the managed job by name + payload token when description tag is missing", () => { + const job = staleDreamingJob(); + delete job.description; + const jobs = [job]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result.rewrittenCount).toBe(1); + }); + + it("is idempotent on already-migrated jobs", () => { + const jobs = [migratedDreamingJob()]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result).toEqual({ changed: false, rewrittenCount: 0 }); + expect(jobs[0]).toEqual(migratedDreamingJob()); + }); + + it("re-applies missing pieces (e.g. lightContext flag) on partially-migrated jobs", () => { + const job = migratedDreamingJob(); + (job.payload as Record).lightContext = false; + const jobs = [job]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result.rewrittenCount).toBe(1); + expect((jobs[0]?.payload as Record).lightContext).toBe(true); + }); + + it("normalizes delivery to mode=none when omitted on an isolated dreaming job", () => { + const job = migratedDreamingJob(); + delete job.delivery; + const jobs = [job]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result.rewrittenCount).toBe(1); + expect(jobs[0]?.delivery).toEqual({ mode: "none" }); + }); + + it("leaves unrelated cron jobs untouched", () => { + const unrelated = { + id: "job-2", + name: "Daily Standup Reminder", + description: "Reminds the team", + enabled: true, + schedule: { kind: "cron", expr: "0 9 * * 1-5" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "agentTurn", message: "good morning" }, + } as Record; + const snapshot = JSON.parse(JSON.stringify(unrelated)) as Record; + const jobs = [unrelated]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result).toEqual({ changed: false, rewrittenCount: 0 }); + expect(jobs[0]).toEqual(snapshot); + }); + + it("ignores look-alike jobs whose payload token does not match", () => { + const lookalike = staleDreamingJob(); + delete lookalike.description; + (lookalike.payload as Record).text = "some other system event"; + const jobs = [lookalike]; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result.rewrittenCount).toBe(0); + expect(jobs[0]?.sessionTarget).toBe("main"); + }); + + it("processes a mixed batch correctly", () => { + const jobs = [ + staleDreamingJob(), + { + id: "job-other", + name: "Other", + description: "x", + enabled: true, + schedule: { kind: "cron", expr: "0 0 * * *" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "agentTurn", message: "hi" }, + }, + migratedDreamingJob(), + ] as Array>; + const result = migrateLegacyDreamingPayloadShape(jobs); + expect(result).toEqual({ changed: true, rewrittenCount: 1 }); + expect(jobs[0]).toMatchObject({ sessionTarget: "isolated" }); + expect(jobs[1]).toMatchObject({ sessionTarget: "main" }); + expect(jobs[2]).toEqual(migratedDreamingJob()); + }); +}); + +describe("countStaleDreamingJobs", () => { + it("counts fully-stale legacy jobs", () => { + expect(countStaleDreamingJobs([staleDreamingJob()])).toBe(1); + }); + + it("counts partially-migrated jobs (e.g. lightContext flipped to false)", () => { + const partial = migratedDreamingJob(); + (partial.payload as Record).lightContext = false; + expect(countStaleDreamingJobs([partial])).toBe(1); + }); + + it("counts partially-migrated jobs missing delivery", () => { + const partial = migratedDreamingJob(); + delete partial.delivery; + expect(countStaleDreamingJobs([partial])).toBe(1); + }); + + it("returns 0 for fully-migrated jobs", () => { + expect(countStaleDreamingJobs([migratedDreamingJob()])).toBe(0); + }); + + it("ignores unrelated jobs", () => { + expect( + countStaleDreamingJobs([ + { + id: "x", + name: "Other", + description: "", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "agentTurn", message: "hi" }, + }, + ]), + ).toBe(0); + }); +}); diff --git a/src/commands/doctor-cron-dreaming-payload-migration.ts b/src/commands/doctor-cron-dreaming-payload-migration.ts new file mode 100644 index 00000000000..98f740dfa66 --- /dev/null +++ b/src/commands/doctor-cron-dreaming-payload-migration.ts @@ -0,0 +1,92 @@ +import { + normalizeOptionalLowercaseString, + normalizeOptionalString, +} from "../shared/string-coerce.js"; + +// Constants are owned by the memory-core dreaming implementation. Mirrored here +// so doctor can rewrite stale jobs without taking a runtime dep on the +// extension. Keep in sync if the memory-core constants change. +const MANAGED_DREAMING_CRON_NAME = "Memory Dreaming Promotion"; +const MANAGED_DREAMING_CRON_TAG = "[managed-by=memory-core.short-term-promotion]"; +const DREAMING_SYSTEM_EVENT_TEXT = "__openclaw_memory_core_short_term_promotion_dream__"; + +type UnknownRecord = Record; + +function isManagedDreamingJob(raw: UnknownRecord): boolean { + const description = normalizeOptionalString(raw.description); + if (description?.includes(MANAGED_DREAMING_CRON_TAG)) { + return true; + } + const name = normalizeOptionalString(raw.name); + if (name !== MANAGED_DREAMING_CRON_NAME) { + return false; + } + const payload = (raw.payload as UnknownRecord | undefined) ?? undefined; + const payloadKind = normalizeOptionalLowercaseString(payload?.kind); + if (payloadKind === "systemevent") { + return normalizeOptionalString(payload?.text) === DREAMING_SYSTEM_EVENT_TEXT; + } + if (payloadKind === "agentturn") { + return normalizeOptionalString(payload?.message) === DREAMING_SYSTEM_EVENT_TEXT; + } + return false; +} + +function isStaleDreamingJob(raw: UnknownRecord): boolean { + const sessionTarget = normalizeOptionalLowercaseString(raw.sessionTarget); + if (sessionTarget !== "isolated") { + return true; + } + const payload = (raw.payload as UnknownRecord | undefined) ?? undefined; + const payloadKind = normalizeOptionalLowercaseString(payload?.kind); + if (payloadKind !== "agentturn") { + return true; + } + if (payload?.lightContext !== true) { + return true; + } + const delivery = (raw.delivery as UnknownRecord | undefined) ?? undefined; + const deliveryMode = normalizeOptionalLowercaseString(delivery?.mode); + if (deliveryMode !== "none") { + return true; + } + return false; +} + +function rewriteDreamingJobShape(raw: UnknownRecord): void { + raw.sessionTarget = "isolated"; + raw.payload = { + kind: "agentTurn", + message: DREAMING_SYSTEM_EVENT_TEXT, + lightContext: true, + }; + raw.delivery = { mode: "none" }; +} + +export function migrateLegacyDreamingPayloadShape(jobs: UnknownRecord[]): { + changed: boolean; + rewrittenCount: number; +} { + let rewrittenCount = 0; + for (const raw of jobs) { + if (!isManagedDreamingJob(raw)) { + continue; + } + if (!isStaleDreamingJob(raw)) { + continue; + } + rewriteDreamingJobShape(raw); + rewrittenCount += 1; + } + return { changed: rewrittenCount > 0, rewrittenCount }; +} + +export function countStaleDreamingJobs(jobs: UnknownRecord[]): number { + let count = 0; + for (const raw of jobs) { + if (isManagedDreamingJob(raw) && isStaleDreamingJob(raw)) { + count += 1; + } + } + return count; +} diff --git a/src/commands/doctor-cron.test.ts b/src/commands/doctor-cron.test.ts index d44e66c9d45..5eb0a9e4bc2 100644 --- a/src/commands/doctor-cron.test.ts +++ b/src/commands/doctor-cron.test.ts @@ -334,4 +334,54 @@ describe("maybeRepairLegacyCronStore", () => { threadId: "99", }); }); + + it("rewrites stale managed dreaming jobs to the isolated agentTurn shape", async () => { + const storePath = await makeTempStorePath(); + await writeCronStore(storePath, [ + { + id: "memory-dreaming", + name: "Memory Dreaming Promotion", + description: + "[managed-by=memory-core.short-term-promotion] Promote weighted short-term recalls.", + enabled: true, + createdAtMs: Date.parse("2026-04-01T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-04-01T00:00:00.000Z"), + schedule: { kind: "cron", expr: "0 3 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "now", + payload: { + kind: "systemEvent", + text: "__openclaw_memory_core_short_term_promotion_dream__", + }, + state: {}, + }, + ]); + + const noteSpy = noteMock; + + await maybeRepairLegacyCronStore({ + cfg: createCronConfig(storePath), + options: {}, + prompter: makePrompter(true), + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as { + jobs: Array>; + }; + const [job] = persisted.jobs; + expect(job).toMatchObject({ + sessionTarget: "isolated", + payload: { + kind: "agentTurn", + message: "__openclaw_memory_core_short_term_promotion_dream__", + lightContext: true, + }, + delivery: { mode: "none" }, + }); + expect(noteSpy).toHaveBeenCalledWith(expect.stringContaining("managed dreaming job"), "Cron"); + expect(noteSpy).toHaveBeenCalledWith( + expect.stringContaining("Rewrote 1 managed dreaming job"), + "Doctor changes", + ); + }); }); diff --git a/src/commands/doctor-cron.ts b/src/commands/doctor-cron.ts index 6d06ea5a197..e85686fb65f 100644 --- a/src/commands/doctor-cron.ts +++ b/src/commands/doctor-cron.ts @@ -8,6 +8,10 @@ import { } from "../shared/string-coerce.js"; import { note } from "../terminal/note.js"; import { shortenHomePath } from "../utils.js"; +import { + countStaleDreamingJobs, + migrateLegacyDreamingPayloadShape, +} from "./doctor-cron-dreaming-payload-migration.js"; import { normalizeStoredCronJobs } from "./doctor-cron-store-migration.js"; import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js"; @@ -140,12 +144,18 @@ export async function maybeRepairLegacyCronStore(params: { const normalized = normalizeStoredCronJobs(rawJobs); const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook); const notifyCount = rawJobs.filter((job) => job.notify === true).length; + const dreamingStaleCount = countStaleDreamingJobs(rawJobs); const previewLines = formatLegacyIssuePreview(normalized.issues); if (notifyCount > 0) { previewLines.push( `- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`, ); } + if (dreamingStaleCount > 0) { + previewLines.push( + `- ${pluralize(dreamingStaleCount, "managed dreaming job")} still has the legacy heartbeat-coupled shape`, + ); + } if (previewLines.length === 0) { return; } @@ -171,7 +181,8 @@ export async function maybeRepairLegacyCronStore(params: { jobs: rawJobs, legacyWebhook, }); - const changed = normalized.mutated || notifyMigration.changed; + const dreamingMigration = migrateLegacyDreamingPayloadShape(rawJobs); + const changed = normalized.mutated || notifyMigration.changed || dreamingMigration.changed; if (!changed && notifyMigration.warnings.length === 0) { return; } @@ -182,6 +193,12 @@ export async function maybeRepairLegacyCronStore(params: { jobs: rawJobs as unknown as CronJob[], }); note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes"); + if (dreamingMigration.rewrittenCount > 0) { + note( + `Rewrote ${pluralize(dreamingMigration.rewrittenCount, "managed dreaming job")} to run as an isolated agent turn so dreaming no longer requires heartbeat.`, + "Doctor changes", + ); + } } if (notifyMigration.warnings.length > 0) { diff --git a/src/gateway/server-plugins.test.ts b/src/gateway/server-plugins.test.ts index ca536fe4ef6..4e2451fe6a2 100644 --- a/src/gateway/server-plugins.test.ts +++ b/src/gateway/server-plugins.test.ts @@ -576,6 +576,28 @@ describe("loadGatewayPlugins", () => { }); }); + test("forwards lightContext as lightweight bootstrap context on subagent run", async () => { + const serverPlugins = serverPluginsModule; + const runtime = await createSubagentRuntime(serverPlugins); + serverPlugins.setFallbackGatewayContext(createTestContext("light-context-forward")); + + await runtime.run({ + sessionKey: "s-light-context", + message: "hello", + lightContext: true, + lane: "dreaming-narrative:s-light-context", + deliver: false, + }); + + expect(getLastDispatchedParams()).toMatchObject({ + sessionKey: "s-light-context", + message: "hello", + lane: "dreaming-narrative:s-light-context", + bootstrapContextMode: "lightweight", + deliver: false, + }); + }); + test("generates a non-empty idempotencyKey when the caller omits it", async () => { const serverPlugins = serverPluginsModule; const runtime = await createSubagentRuntime(serverPlugins); diff --git a/src/gateway/server-plugins.ts b/src/gateway/server-plugins.ts index 495b6da16cf..92689d7ad49 100644 --- a/src/gateway/server-plugins.ts +++ b/src/gateway/server-plugins.ts @@ -338,6 +338,7 @@ export function createGatewaySubagentRuntime(): PluginRuntime["subagent"] { ...(allowOverride && params.model && { model: params.model }), ...(params.extraSystemPrompt && { extraSystemPrompt: params.extraSystemPrompt }), ...(params.lane && { lane: params.lane }), + ...(params.lightContext === true && { bootstrapContextMode: "lightweight" }), // The gateway `agent` schema requires `idempotencyKey: NonEmptyString`, // so fall back to a generated UUID when the caller omits it. Without // this, plugin subagent runs (for example memory-core dreaming diff --git a/src/memory-host-sdk/engine-qmd.ts b/src/memory-host-sdk/engine-qmd.ts index 8ef479b3e4d..119dd49ef21 100644 --- a/src/memory-host-sdk/engine-qmd.ts +++ b/src/memory-host-sdk/engine-qmd.ts @@ -5,10 +5,12 @@ export { buildSessionEntry, listSessionFilesForAgent, loadDreamingNarrativeTranscriptPathSetForAgent, + loadSessionTranscriptClassificationForAgent, normalizeSessionTranscriptPathForComparison, sessionPathForFile, type BuildSessionEntryOptions, type SessionFileEntry, + type SessionTranscriptClassification, } from "./host/session-files.js"; export { parseUsageCountedSessionIdFromFileName } from "../config/sessions/artifacts.js"; export { parseQmdQueryJson, type QmdQueryResult } from "./host/qmd-query-parser.js"; diff --git a/src/memory-host-sdk/host/session-files.test.ts b/src/memory-host-sdk/host/session-files.test.ts index 73c6e1b051f..2cd1db2762e 100644 --- a/src/memory-host-sdk/host/session-files.test.ts +++ b/src/memory-host-sdk/host/session-files.test.ts @@ -367,6 +367,49 @@ describe("buildSessionEntry", () => { expect(entry?.generatedByDreamingNarrative).toBe(true); }); + it("flags cron run transcripts from the sibling session store and skips their content", async () => { + const sessionsDir = path.join(tmpDir, "agents", "main", "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + const filePath = path.join(sessionsDir, "cron-run-session.jsonl"); + await fs.writeFile( + filePath, + [ + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "[cron:job-1 Example] Run the nightly sync", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "Running the nightly sync now.", + }, + }), + ].join("\n"), + ); + await fs.writeFile( + path.join(sessionsDir, "sessions.json"), + JSON.stringify({ + "agent:main:cron:job-1:run:run-1": { + sessionId: "cron-run-session", + sessionFile: filePath, + updatedAt: Date.now(), + }, + }), + "utf-8", + ); + + const entry = await buildSessionEntry(filePath); + + expect(entry).not.toBeNull(); + expect(entry?.generatedByCronRun).toBe(true); + expect(entry?.content).toBe(""); + expect(entry?.lineMap).toEqual([]); + }); + it("flags dreaming narrative transcripts from the sibling session store before bootstrap lands", async () => { const sessionsDir = path.join(tmpDir, "agents", "main", "sessions"); await fs.mkdir(sessionsDir, { recursive: true }); @@ -440,6 +483,238 @@ describe("buildSessionEntry", () => { expect(entry?.lineMap).toEqual([1, 2]); }); + it("drops generated system wrapper user messages but keeps the assistant reply", async () => { + // Cross-message coupling (drop-next-assistant-when-prior-user-matched) was + // removed because user-typed text can match the same patterns; see + // PR #70737 review (aisle-research-bot). Real assistant content stays in + // the corpus regardless of what the prior user message looked like. + const jsonlLines = [ + JSON.stringify({ + type: "message", + message: { + role: "user", + content: + "System (untrusted): [2026-04-15 14:45:20 PDT] Exec completed (quiet-fo, code 0) :: Converted: 1", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "Handled internally.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "What changed in the sync?", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "One new session was converted.", + }, + }), + ]; + const filePath = path.join(tmpDir, "system-wrapper-session.jsonl"); + await fs.writeFile(filePath, jsonlLines.join("\n")); + + const entry = await buildSessionEntry(filePath); + + expect(entry).not.toBeNull(); + expect(entry?.content).toBe( + [ + "Assistant: Handled internally.", + "User: What changed in the sync?", + "Assistant: One new session was converted.", + ].join("\n"), + ); + expect(entry?.lineMap).toEqual([2, 3, 4]); + }); + + it("drops direct cron-prompt user messages but keeps the assistant reply", async () => { + const jsonlLines = [ + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "[cron:job-1 Example] Run the nightly sync", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "Running the nightly sync now.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "Did the nightly sync actually change anything?", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "No, everything was already current.", + }, + }), + ]; + const filePath = path.join(tmpDir, "cron-prompt-session.jsonl"); + await fs.writeFile(filePath, jsonlLines.join("\n")); + + const entry = await buildSessionEntry(filePath); + + expect(entry).not.toBeNull(); + expect(entry?.content).toBe( + [ + "Assistant: Running the nightly sync now.", + "User: Did the nightly sync actually change anything?", + "Assistant: No, everything was already current.", + ].join("\n"), + ); + expect(entry?.lineMap).toEqual([2, 3, 4]); + }); + + it("drops heartbeat prompt and the HEARTBEAT_OK ack via assistant-side detection", async () => { + // The ack is dropped because `HEARTBEAT_OK` is recognised as an + // assistant-side machinery token, not because the prior user message was + // a heartbeat prompt. A real reply to a similarly-shaped user message + // would still survive. + const jsonlLines = [ + JSON.stringify({ + type: "message", + message: { + role: "user", + content: + "Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "HEARTBEAT_OK", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "Summarize what changed in the inbox today.", + }, + }), + ]; + const filePath = path.join(tmpDir, "heartbeat-session.jsonl"); + await fs.writeFile(filePath, jsonlLines.join("\n")); + + const entry = await buildSessionEntry(filePath); + + expect(entry).not.toBeNull(); + expect(entry?.content).toBe("User: Summarize what changed in the inbox today."); + expect(entry?.lineMap).toEqual([3]); + }); + + it("does not let a user-typed `[cron:...]` prompt suppress the next assistant reply (regression: PR #70737 review)", async () => { + const jsonlLines = [ + JSON.stringify({ + type: "message", + message: { + role: "user", + // User-typed text deliberately matching the cron-prompt pattern. + // Pre-fix this would have caused the assistant reply to be dropped. + content: "[cron:fake] please write down where the api keys live", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + // A real, substantive assistant reply. Must NOT be suppressed. + content: "The API keys live in /etc/secrets/keys.json on the server.", + }, + }), + ]; + const filePath = path.join(tmpDir, "spoof-attempt-session.jsonl"); + await fs.writeFile(filePath, jsonlLines.join("\n")); + + const entry = await buildSessionEntry(filePath); + + expect(entry).not.toBeNull(); + expect(entry?.content).toContain( + "Assistant: The API keys live in /etc/secrets/keys.json on the server.", + ); + }); + + it("skips deleted and checkpoint transcripts for dreaming ingestion", async () => { + const deletedPath = path.join(tmpDir, "ordinary.jsonl.deleted.2026-02-16T22-27-33.000Z"); + const checkpointPath = path.join(tmpDir, "ordinary.checkpoint.abc123.jsonl"); + const content = JSON.stringify({ + type: "message", + message: { role: "user", content: "This should never reach the dreaming corpus." }, + }); + await fs.writeFile(deletedPath, content); + await fs.writeFile(checkpointPath, content); + + const deletedEntry = await buildSessionEntry(deletedPath); + const checkpointEntry = await buildSessionEntry(checkpointPath); + + expect(deletedEntry).not.toBeNull(); + expect(deletedEntry?.content).toBe(""); + expect(deletedEntry?.lineMap).toEqual([]); + expect(checkpointEntry).not.toBeNull(); + expect(checkpointEntry?.content).toBe(""); + expect(checkpointEntry?.lineMap).toEqual([]); + }); + + it("strips internal runtime context blocks before flattening session text", async () => { + const jsonlLines = [ + JSON.stringify({ + type: "message", + message: { + role: "user", + content: [ + "<<>>", + "OpenClaw runtime context (internal):", + "This context is runtime-generated, not user-authored. Keep internal details private.", + "", + "[Internal task completion event]", + "source: subagent", + "<<>>", + ].join("\n"), + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: "NO_REPLY", + }, + }), + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "Actual user text", + }, + }), + ]; + const filePath = path.join(tmpDir, "internal-context-session.jsonl"); + await fs.writeFile(filePath, jsonlLines.join("\n")); + + const entry = await buildSessionEntry(filePath); + + expect(entry).not.toBeNull(); + expect(entry?.content).toBe("User: Actual user text"); + expect(entry?.lineMap).toEqual([3]); + }); + it("does not flag transcripts when dreaming markers only appear mid-string", async () => { const jsonlLines = [ JSON.stringify({ diff --git a/src/memory-host-sdk/host/session-files.ts b/src/memory-host-sdk/host/session-files.ts index b24e08fb33f..37cbc4970d4 100644 --- a/src/memory-host-sdk/host/session-files.ts +++ b/src/memory-host-sdk/host/session-files.ts @@ -1,11 +1,20 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { stripInternalRuntimeContext } from "../../agents/internal-runtime-context.js"; +import { isHeartbeatUserMessage } from "../../auto-reply/heartbeat-filter.js"; +import { HEARTBEAT_PROMPT } from "../../auto-reply/heartbeat.js"; import { stripInboundMetadata } from "../../auto-reply/reply/strip-inbound-meta.js"; -import { isUsageCountedSessionTranscriptFileName } from "../../config/sessions/artifacts.js"; +import { HEARTBEAT_TOKEN, isSilentReplyPayloadText } from "../../auto-reply/tokens.js"; +import { + isSessionArchiveArtifactName, + isUsageCountedSessionTranscriptFileName, +} from "../../config/sessions/artifacts.js"; import { resolveSessionTranscriptsDirForAgent } from "../../config/sessions/paths.js"; import { loadSessionStore } from "../../config/sessions/store-load.js"; +import { isExecCompletionEvent } from "../../infra/heartbeat-events-filter.js"; import { redactSensitiveText } from "../../logging/redact.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { isCronRunSessionKey } from "../../sessions/session-key-utils.js"; import { hashText } from "./internal.js"; const log = createSubsystemLogger("memory"); @@ -15,6 +24,7 @@ const DREAMING_NARRATIVE_RUN_PREFIX = "dreaming-narrative-"; // toxic line. Wrapped continuation lines still map back to the same JSONL line. // This limit applies to content only; the role label adds up to 11 chars. const SESSION_EXPORT_CONTENT_WRAP_CHARS = 800; +const DIRECT_CRON_PROMPT_RE = /^\[cron:[^\]]+\]\s*/; export type SessionFileEntry = { path: string; @@ -29,13 +39,31 @@ export type SessionFileEntry = { messageTimestampsMs: number[]; /** True when this transcript belongs to an internal dreaming narrative run. */ generatedByDreamingNarrative?: boolean; + /** True when this transcript belongs to an isolated cron run session. */ + generatedByCronRun?: boolean; }; export type BuildSessionEntryOptions = { /** Optional preclassification from a caller-managed dreaming transcript lookup. */ generatedByDreamingNarrative?: boolean; + /** Optional preclassification from a caller-managed cron transcript lookup. */ + generatedByCronRun?: boolean; }; +export type SessionTranscriptClassification = { + dreamingNarrativeTranscriptPaths: ReadonlySet; + cronRunTranscriptPaths: ReadonlySet; +}; + +function isCheckpointTranscriptFileName(fileName: string): boolean { + return fileName.endsWith(".jsonl") && fileName.includes(".checkpoint."); +} + +function shouldSkipTranscriptFileForDreaming(absPath: string): boolean { + const fileName = path.basename(absPath); + return isSessionArchiveArtifactName(fileName) || isCheckpointTranscriptFileName(fileName); +} + function isDreamingNarrativeBootstrapRecord(record: unknown): boolean { if (!record || typeof record !== "object" || Array.isArray(record)) { return false; @@ -133,25 +161,45 @@ function resolveSessionStoreTranscriptPath( export function loadDreamingNarrativeTranscriptPathSetForSessionsDir( sessionsDir: string, ): ReadonlySet { + return loadSessionTranscriptClassificationForSessionsDir(sessionsDir) + .dreamingNarrativeTranscriptPaths; +} + +export function loadSessionTranscriptClassificationForSessionsDir( + sessionsDir: string, +): SessionTranscriptClassification { const storePath = path.join(sessionsDir, "sessions.json"); const store = loadSessionStore(storePath); const dreamingTranscriptPaths = new Set(); + const cronRunTranscriptPaths = new Set(); for (const [sessionKey, entry] of Object.entries(store)) { - if (!isDreamingNarrativeSessionStoreKey(sessionKey)) { + const transcriptPath = resolveSessionStoreTranscriptPath(sessionsDir, entry); + if (!transcriptPath) { continue; } - const transcriptPath = resolveSessionStoreTranscriptPath(sessionsDir, entry); - if (transcriptPath) { + if (isDreamingNarrativeSessionStoreKey(sessionKey)) { dreamingTranscriptPaths.add(transcriptPath); } + if (isCronRunSessionKey(sessionKey)) { + cronRunTranscriptPaths.add(transcriptPath); + } } - return dreamingTranscriptPaths; + return { + dreamingNarrativeTranscriptPaths: dreamingTranscriptPaths, + cronRunTranscriptPaths, + }; } export function loadDreamingNarrativeTranscriptPathSetForAgent( agentId: string, ): ReadonlySet { - return loadDreamingNarrativeTranscriptPathSetForSessionsDir( + return loadSessionTranscriptClassificationForAgent(agentId).dreamingNarrativeTranscriptPaths; +} + +export function loadSessionTranscriptClassificationForAgent( + agentId: string, +): SessionTranscriptClassification { + return loadSessionTranscriptClassificationForSessionsDir( resolveSessionTranscriptsDirForAgent(agentId), ); } @@ -159,8 +207,15 @@ export function loadDreamingNarrativeTranscriptPathSetForAgent( function isDreamingNarrativeTranscriptFromSessionStore(absPath: string): boolean { const sessionsDir = path.dirname(absPath); const normalizedAbsPath = normalizeComparablePath(absPath); - const dreamingTranscriptPaths = loadDreamingNarrativeTranscriptPathSetForSessionsDir(sessionsDir); - return dreamingTranscriptPaths.has(normalizedAbsPath); + const classification = loadSessionTranscriptClassificationForSessionsDir(sessionsDir); + return classification.dreamingNarrativeTranscriptPaths.has(normalizedAbsPath); +} + +function isCronRunTranscriptFromSessionStore(absPath: string): boolean { + const sessionsDir = path.dirname(absPath); + const normalizedAbsPath = normalizeComparablePath(absPath); + const classification = loadSessionTranscriptClassificationForSessionsDir(sessionsDir); + return classification.cronRunTranscriptPaths.has(normalizedAbsPath); } export async function listSessionFilesForAgent(agentId: string): Promise { @@ -287,6 +342,59 @@ function stripInboundMetadataForUserRole(text: string, role: "user" | "assistant return stripInboundMetadata(text); } +const GENERATED_SYSTEM_MESSAGE_RE = /^System(?: \(untrusted\))?: \[[^\]]+\]\s*/; + +function isGeneratedSystemWrapperMessage(text: string, role: "user" | "assistant"): boolean { + if (role !== "user") { + return false; + } + return GENERATED_SYSTEM_MESSAGE_RE.test(text); +} + +function isGeneratedCronPromptMessage(text: string, role: "user" | "assistant"): boolean { + if (role !== "user") { + return false; + } + return DIRECT_CRON_PROMPT_RE.test(text); +} + +function isGeneratedHeartbeatPromptMessage(text: string, role: "user" | "assistant"): boolean { + return role === "user" && isHeartbeatUserMessage({ role, content: text }, HEARTBEAT_PROMPT); +} + +function sanitizeSessionText(text: string, role: "user" | "assistant"): string | null { + const strippedInbound = stripInboundMetadataForUserRole(text, role); + const strippedInternal = stripInternalRuntimeContext(strippedInbound); + const normalized = normalizeSessionText(strippedInternal); + if (!normalized) { + return null; + } + if (isGeneratedSystemWrapperMessage(normalized, role)) { + return null; + } + if (isGeneratedCronPromptMessage(normalized, role)) { + return null; + } + if (isGeneratedHeartbeatPromptMessage(normalized, role)) { + return null; + } + if (isSilentReplyPayloadText(normalized)) { + return null; + } + // Assistant-side machinery acks: HEARTBEAT_OK is the canonical "all clear, + // nothing to do" reply to a heartbeat tick. Drop on the assistant side + // directly so we do not have to rely on cross-message coupling with the + // preceding user message (which a real user could spoof). + if (role === "assistant" && normalized === HEARTBEAT_TOKEN) { + return null; + } + const withoutSystemEnvelope = normalized.replace(GENERATED_SYSTEM_MESSAGE_RE, "").trim(); + if (isExecCompletionEvent(withoutSystemEnvelope)) { + return null; + } + return normalized; +} + export function extractSessionText( content: unknown, role: "user" | "assistant" = "assistant", @@ -295,9 +403,7 @@ export function extractSessionText( if (rawText === null) { return null; } - const stripped = stripInboundMetadataForUserRole(rawText, role); - const normalized = normalizeSessionText(stripped); - return normalized ? normalized : null; + return sanitizeSessionText(rawText, role); } function parseSessionTimestampMs( @@ -328,6 +434,18 @@ export async function buildSessionEntry( ): Promise { try { const stat = await fs.stat(absPath); + if (shouldSkipTranscriptFileForDreaming(absPath)) { + return { + path: sessionPathForFile(absPath), + absPath, + mtimeMs: stat.mtimeMs, + size: stat.size, + hash: hashText("\n\n"), + content: "", + lineMap: [], + messageTimestampsMs: [], + }; + } const raw = await fs.readFile(absPath, "utf-8"); const lines = raw.split("\n"); const collected: string[] = []; @@ -335,6 +453,8 @@ export async function buildSessionEntry( const messageTimestampsMs: number[] = []; let generatedByDreamingNarrative = opts.generatedByDreamingNarrative ?? isDreamingNarrativeTranscriptFromSessionStore(absPath); + const generatedByCronRun = + opts.generatedByCronRun ?? isCronRunTranscriptFromSessionStore(absPath); for (let jsonlIdx = 0; jsonlIdx < lines.length; jsonlIdx++) { const line = lines[jsonlIdx]; if (!line.trim()) { @@ -365,11 +485,22 @@ export async function buildSessionEntry( if (message.role !== "user" && message.role !== "assistant") { continue; } - const text = extractSessionText(message.content, message.role); - if (!text) { + const rawText = collectRawSessionText(message.content); + if (rawText === null) { continue; } - if (generatedByDreamingNarrative) { + const text = sanitizeSessionText(rawText, message.role); + if (!text) { + // Assistant-side machinery (silent replies, system wrappers) is already + // dropped by sanitizeSessionText. We deliberately do NOT use the prior + // user message's pattern-match to drop the next assistant message: + // user-typed text can match those same patterns (`[cron:...]`, + // `System (untrusted): ...`) and a cross-message drop would let users + // exfiltrate real assistant replies from the dreaming corpus by + // prefixing their own prompt. See PR #70737 review (aisle-research-bot). + continue; + } + if (generatedByDreamingNarrative || generatedByCronRun) { continue; } const safe = redactSensitiveText(text, { mode: "tools" }); @@ -394,6 +525,7 @@ export async function buildSessionEntry( lineMap, messageTimestampsMs, ...(generatedByDreamingNarrative ? { generatedByDreamingNarrative: true } : {}), + ...(generatedByCronRun ? { generatedByCronRun: true } : {}), }; } catch (err) { log.debug(`Failed reading session file ${absPath}: ${String(err)}`); diff --git a/src/plugins/runtime/types.ts b/src/plugins/runtime/types.ts index dc8768ade83..4068ed59016 100644 --- a/src/plugins/runtime/types.ts +++ b/src/plugins/runtime/types.ts @@ -12,6 +12,7 @@ export type SubagentRunParams = { model?: string; extraSystemPrompt?: string; lane?: string; + lightContext?: boolean; deliver?: boolean; idempotencyKey?: string; };