From de0d484236fc52cbcb8142386db04824fe934bfc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 06:30:36 +0100 Subject: [PATCH] fix(sessions): preserve durable conversation entries --- CHANGELOG.md | 1 + docs/cli/sessions.md | 2 +- docs/concepts/session.md | 4 + .../session-management-compaction.md | 5 + src/config/sessions/disk-budget.test.ts | 46 ++++++++ src/config/sessions/disk-budget.ts | 4 + src/config/sessions/store-maintenance.ts | 104 ++++++++++++++++-- .../store.pruning.integration.test.ts | 30 +++++ src/config/sessions/store.pruning.test.ts | 95 ++++++++++++++++ 9 files changed, 283 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6ad34b619e..b5c71551f27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai - Plugins/ClawHub: preserve official source-linked trust through archive installs, so OpenClaw can install trusted ClawHub plugin packages that trigger the built-in dangerous-pattern scanner. Thanks @vincentkoc. - Providers/LM Studio: allow `models.providers.lmstudio.params.preload: false` to skip OpenClaw's native model-load call so LM Studio JIT loading, idle TTL, and auto-evict can own model lifecycle. Fixes #75921. Thanks @garyd9. - Telegram: inherit the process DNS result order for Bot API transport and downgrade recovered sticky IPv4 fallback promotions to debug logs, while keeping pinned-IP escalation warnings visible. Fixes #75904. Thanks @highfly-hi and @neeravmakwana. +- Sessions: keep durable external conversation pointers, including group and thread-scoped chat sessions, out of age, count, and disk-budget maintenance eviction while still allowing synthetic runtime entries to age out. Fixes #58088. Thanks @drinkflav. - Web search/MiniMax: allow `MINIMAX_OAUTH_TOKEN` to satisfy MiniMax Search credentials, so OAuth-authorized MiniMax Token Plan setups do not need a separate web-search key. Fixes #65768. Thanks @kikibrian and @zhouhe-xydt. - Providers/MiniMax: derive Coding Plan usage polling from the configured MiniMax base URL, so global setups no longer query the CN usage host. Fixes #65054. Thanks @sixone74 and @Yanhu007. - Control UI/WebChat: skip assistant-media transcript supplements when stale media refs resolve to no playable media, so text-only final replies are not stored a second time as gateway-injected assistant messages. Fixes #73956. Thanks @HemantSudarshan. diff --git a/docs/cli/sessions.md b/docs/cli/sessions.md index 4490fdd69c7..e6ede516ae1 100644 --- a/docs/cli/sessions.md +++ b/docs/cli/sessions.md @@ -85,7 +85,7 @@ openclaw sessions cleanup --json - In text mode, dry-run prints a per-session action table (`Action`, `Key`, `Age`, `Model`, `Flags`) so you can see what would be kept vs removed. - `--enforce`: apply maintenance even when `session.maintenance.mode` is `warn`. - `--fix-missing`: remove entries whose transcript files are missing, even if they would not normally age/count out yet. -- `--active-key `: protect a specific active key from disk-budget eviction. +- `--active-key `: protect a specific active key from disk-budget eviction. Durable external conversation pointers, such as group sessions and thread-scoped chat sessions, are also kept by age/count/disk-budget maintenance. - `--agent `: run cleanup for one configured agent store. - `--all-agents`: run cleanup for all configured agent stores. - `--store `: run against a specific `sessions.json` file. diff --git a/docs/concepts/session.md b/docs/concepts/session.md index d3bd792692e..0ac9d1a9a7a 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -127,6 +127,10 @@ to `"enforce"` for automatic cleanup: For production-sized `maxEntries` limits, Gateway runtime writes use a small high-water buffer and clean back down to the configured cap in batches. Session store reads do not prune or cap entries during Gateway startup. This avoids running full store cleanup on every startup or isolated cron session. `openclaw sessions cleanup --enforce` applies the cap immediately. +Maintenance preserves durable external conversation pointers, including group +sessions and thread-scoped chat sessions, while still allowing synthetic cron, +hook, heartbeat, ACP, and sub-agent entries to age out. + Preview with `openclaw sessions cleanup --dry-run`. ## Inspecting sessions diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 946eaa4de2d..32b92e2b334 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -81,6 +81,11 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f Normal Gateway writes batch `maxEntries` cleanup for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately. +Maintenance keeps durable external conversation pointers such as group sessions +and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks, +heartbeat, ACP, and sub-agents can still be removed when they exceed the +configured age, count, or disk budget. + OpenClaw no longer creates automatic `sessions.json.bak.*` rotation backups during Gateway writes. The legacy `session.maintenance.rotateBytes` key is ignored and `openclaw doctor --fix` removes it from older configs. Enforcement order for disk budget cleanup (`mode: "enforce"`): diff --git a/src/config/sessions/disk-budget.test.ts b/src/config/sessions/disk-budget.test.ts index 175e9facb59..9e2887075b9 100644 --- a/src/config/sessions/disk-budget.test.ts +++ b/src/config/sessions/disk-budget.test.ts @@ -196,4 +196,50 @@ describe("enforceSessionDiskBudget", () => { ); }); }); + + it("does not evict protected thread session entries under store pressure", async () => { + await withTempDir({ prefix: "openclaw-disk-budget-" }, async (dir) => { + const storePath = path.join(dir, "sessions.json"); + const protectedKey = "agent:main:slack:channel:C123:thread:1710000000.000100"; + const removableKey = "agent:main:subagent:old-worker"; + const activeKey = "agent:main:main"; + const store: Record = { + [protectedKey]: { + sessionId: "protected-thread", + updatedAt: 1, + displayName: "p".repeat(2000), + }, + [removableKey]: { + sessionId: "removable-worker", + updatedAt: 2, + displayName: "r".repeat(2000), + }, + [activeKey]: { + sessionId: "active", + updatedAt: 3, + }, + }; + await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8"); + + const result = await enforceSessionDiskBudget({ + store, + storePath, + activeSessionKey: activeKey, + maintenance: { + maxDiskBytes: 1000, + highWaterBytes: 500, + }, + warnOnly: false, + }); + + expect(store[protectedKey]).toBeDefined(); + expect(store[removableKey]).toBeUndefined(); + expect(store[activeKey]).toBeDefined(); + expect(result).toEqual( + expect.objectContaining({ + removedEntries: 1, + }), + ); + }); + }); }); diff --git a/src/config/sessions/disk-budget.ts b/src/config/sessions/disk-budget.ts index eacfb515d5a..f201413e93e 100644 --- a/src/config/sessions/disk-budget.ts +++ b/src/config/sessions/disk-budget.ts @@ -15,6 +15,7 @@ import { isTrajectorySessionArtifactName, } from "./artifacts.js"; import { resolveSessionFilePath } from "./paths.js"; +import { isProtectedSessionMaintenanceEntry } from "./store-maintenance.js"; import type { SessionEntry } from "./types.js"; export type SessionDiskBudgetConfig = { @@ -346,6 +347,9 @@ export async function enforceSessionDiskBudget(params: { if (!entry) { continue; } + if (isProtectedSessionMaintenanceEntry(key, entry)) { + continue; + } const previousProjectedBytes = projectedStoreBytes; delete params.store[key]; const chunkBytes = entryChunkBytesByKey.get(key); diff --git a/src/config/sessions/store-maintenance.ts b/src/config/sessions/store-maintenance.ts index b2d5465d3a1..39f6526da67 100644 --- a/src/config/sessions/store-maintenance.ts +++ b/src/config/sessions/store-maintenance.ts @@ -1,8 +1,18 @@ import { parseByteSize } from "../../cli/parse-bytes.js"; import { parseDurationMs } from "../../cli/parse-duration.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; -import { normalizeStringifiedOptionalString } from "../../shared/string-coerce.js"; +import { + isAcpSessionKey, + isCronSessionKey, + isSubagentSessionKey, + parseAgentSessionKey, +} from "../../sessions/session-key-utils.js"; +import { + normalizeLowercaseStringOrEmpty, + normalizeStringifiedOptionalString, +} from "../../shared/string-coerce.js"; import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js"; +import { parseSessionThreadInfoFast } from "./thread-info.js"; import type { SessionEntry } from "./types.js"; const log = createSubsystemLogger("sessions/store"); @@ -176,7 +186,7 @@ export function pruneStaleEntries( const cutoffMs = Date.now() - maxAgeMs; let pruned = 0; for (const [key, entry] of Object.entries(store)) { - if (opts.preserveKeys?.has(key)) { + if (shouldPreserveMaintenanceEntry({ key, entry, preserveKeys: opts.preserveKeys })) { continue; } if (entry?.updatedAt != null && entry.updatedAt < cutoffMs) { @@ -195,6 +205,64 @@ function getEntryUpdatedAt(entry?: SessionEntry): number { return entry?.updatedAt ?? Number.NEGATIVE_INFINITY; } +function isSyntheticSessionMaintenanceKey(sessionKey: string): boolean { + const parsed = parseAgentSessionKey(sessionKey); + const rest = normalizeLowercaseStringOrEmpty(parsed?.rest ?? sessionKey); + return ( + isSubagentSessionKey(sessionKey) || + isAcpSessionKey(sessionKey) || + isCronSessionKey(sessionKey) || + rest.startsWith("hook:") || + rest.startsWith("node:") || + rest === "heartbeat" || + rest.endsWith(":heartbeat") || + rest.includes(":heartbeat:") + ); +} + +function isTelegramTopicSessionKey(sessionKey: string): boolean { + const parsed = parseAgentSessionKey(sessionKey); + const rest = normalizeLowercaseStringOrEmpty(parsed?.rest ?? sessionKey); + return /^telegram:(?:group|channel|direct|dm):.+:topic:[^:]+$/.test(rest); +} + +function isExternalGroupOrChannelSessionKey(sessionKey: string): boolean { + const parsed = parseAgentSessionKey(sessionKey); + const rest = normalizeLowercaseStringOrEmpty(parsed?.rest ?? sessionKey); + return /^[^:]+:(?:group|channel):.+$/.test(rest); +} + +export function isProtectedSessionMaintenanceEntry( + sessionKey: string, + entry: SessionEntry | undefined, +): boolean { + if (isSyntheticSessionMaintenanceKey(sessionKey)) { + return false; + } + if (parseSessionThreadInfoFast(sessionKey).threadId) { + return true; + } + if (isTelegramTopicSessionKey(sessionKey)) { + return true; + } + if (isExternalGroupOrChannelSessionKey(sessionKey)) { + return true; + } + const chatType = normalizeLowercaseStringOrEmpty(entry?.chatType ?? entry?.origin?.chatType); + return chatType === "group" || chatType === "channel" || chatType === "thread"; +} + +function shouldPreserveMaintenanceEntry(params: { + key: string; + entry: SessionEntry | undefined; + preserveKeys?: ReadonlySet; +}): boolean { + return ( + params.preserveKeys?.has(params.key) === true || + isProtectedSessionMaintenanceEntry(params.key, params.entry) + ); +} + export function getActiveSessionMaintenanceWarning(params: { store: Record; activeSessionKey: string; @@ -210,6 +278,9 @@ export function getActiveSessionMaintenanceWarning(params: { if (!activeEntry) { return null; } + if (isProtectedSessionMaintenanceEntry(activeSessionKey, activeEntry)) { + return null; + } const now = params.nowMs ?? Date.now(); const cutoffMs = now - params.pruneAfterMs; const wouldPrune = activeEntry.updatedAt != null ? activeEntry.updatedAt < cutoffMs : false; @@ -251,6 +322,15 @@ function wouldCapActiveSession(params: { return true; } + const protectedCount = params.keys.filter( + (key) => + key !== params.activeSessionKey && isProtectedSessionMaintenanceEntry(key, params.store[key]), + ).length; + const maxRemovableEntries = Math.max(0, params.maxEntries - protectedCount); + if (maxRemovableEntries <= 0) { + return true; + } + const activeUpdatedAt = getEntryUpdatedAt(params.activeEntry); let newerOrTieBeforeActive = 0; let seenActive = false; @@ -259,10 +339,13 @@ function wouldCapActiveSession(params: { seenActive = true; continue; } + if (isProtectedSessionMaintenanceEntry(key, params.store[key])) { + continue; + } const entryUpdatedAt = getEntryUpdatedAt(params.store[key]); if (entryUpdatedAt > activeUpdatedAt || (!seenActive && entryUpdatedAt === activeUpdatedAt)) { newerOrTieBeforeActive++; - if (newerOrTieBeforeActive >= params.maxEntries) { + if (newerOrTieBeforeActive >= maxRemovableEntries) { return true; } } @@ -286,11 +369,18 @@ export function capEntryCount( } = {}, ): number { const maxEntries = overrideMax ?? resolveMaintenanceConfigFromInput().maxEntries; - const preservedCount = opts.preserveKeys - ? Object.keys(store).filter((key) => opts.preserveKeys?.has(key)).length - : 0; + const preservedCount = Object.entries(store).filter(([key, entry]) => + shouldPreserveMaintenanceEntry({ key, entry, preserveKeys: opts.preserveKeys }), + ).length; const maxRemovableEntries = Math.max(0, maxEntries - preservedCount); - const keys = Object.keys(store).filter((key) => !opts.preserveKeys?.has(key)); + const keys = Object.keys(store).filter( + (key) => + !shouldPreserveMaintenanceEntry({ + key, + entry: store[key], + preserveKeys: opts.preserveKeys, + }), + ); if (keys.length <= maxRemovableEntries) { return 0; } diff --git a/src/config/sessions/store.pruning.integration.test.ts b/src/config/sessions/store.pruning.integration.test.ts index aacf25ecdac..4578c143759 100644 --- a/src/config/sessions/store.pruning.integration.test.ts +++ b/src/config/sessions/store.pruning.integration.test.ts @@ -415,6 +415,36 @@ describe("Integration: saveSessionStore with pruning", () => { expect(loaded["session-74"]).toBeUndefined(); }); + it("explicit loadSessionStore maintenance preserves channel, thread, and topic session pointers", async () => { + const now = Date.now(); + const channelKey = "agent:main:slack:channel:C123"; + const threadKey = "agent:main:discord:channel:123456:thread:987654"; + const topicKey = "agent:main:telegram:group:-100123:topic:77"; + const store = Object.fromEntries( + Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]), + ); + store[channelKey] = makeEntry(now - 99 * DAY_MS); + store[threadKey] = makeEntry(now - 100 * DAY_MS); + store[topicKey] = makeEntry(now - 101 * DAY_MS); + await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); + + const loaded = loadSessionStore(storePath, { + skipCache: true, + runMaintenance: true, + maintenanceConfig: { + ...ENFORCED_MAINTENANCE_OVERRIDE, + maxEntries: 50, + pruneAfterMs: 365 * DAY_MS, + }, + }); + + expect(Object.keys(loaded)).toHaveLength(50); + expect(loaded[channelKey]).toBeDefined(); + expect(loaded[threadKey]).toBeDefined(); + expect(loaded[topicKey]).toBeDefined(); + expect(loaded["session-74"]).toBeUndefined(); + }); + it("updateSessionStore batches cap-hit maintenance instead of pruning every new session", async () => { const now = Date.now(); const store = Object.fromEntries( diff --git a/src/config/sessions/store.pruning.test.ts b/src/config/sessions/store.pruning.test.ts index c069f799453..43651755e36 100644 --- a/src/config/sessions/store.pruning.test.ts +++ b/src/config/sessions/store.pruning.test.ts @@ -2,6 +2,7 @@ import crypto from "node:crypto"; import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { createFixtureSuite } from "../../test-utils/fixture-suite.js"; import { + isProtectedSessionMaintenanceEntry, resolveMaintenanceConfigFromInput, resolveSessionEntryMaintenanceHighWater, } from "./store-maintenance.js"; @@ -47,6 +48,28 @@ describe("pruneStaleEntries", () => { expect(store.old).toBeUndefined(); expect(store.fresh).toBeDefined(); }); + + it("preserves durable external conversation entries", () => { + const now = Date.now(); + const store = makeStore([ + ["old", makeEntry(now - 31 * DAY_MS)], + ["agent:main:slack:channel:C123:thread:1710000000.000100", makeEntry(now - 31 * DAY_MS)], + ["agent:main:telegram:group:-100123:topic:77", makeEntry(now - 31 * DAY_MS)], + ["agent:main:slack:channel:C999", makeEntry(now - 31 * DAY_MS)], + ["agent:main:telegram:group:-100123", { ...makeEntry(now - 31 * DAY_MS), chatType: "group" }], + ["agent:main:discord:channel:ops", { ...makeEntry(now - 31 * DAY_MS), chatType: "channel" }], + ]); + + const pruned = pruneStaleEntries(store, 30 * DAY_MS); + + expect(pruned).toBe(1); + expect(store.old).toBeUndefined(); + expect(store["agent:main:slack:channel:C123:thread:1710000000.000100"]).toBeDefined(); + expect(store["agent:main:telegram:group:-100123:topic:77"]).toBeDefined(); + expect(store["agent:main:slack:channel:C999"]).toBeDefined(); + expect(store["agent:main:telegram:group:-100123"]).toBeDefined(); + expect(store["agent:main:discord:channel:ops"]).toBeDefined(); + }); }); describe("capEntryCount", () => { @@ -70,6 +93,78 @@ describe("capEntryCount", () => { expect(store.oldest).toBeUndefined(); expect(store.old).toBeUndefined(); }); + + it("preserves durable external conversation entries when capping", () => { + const now = Date.now(); + const threadKey = "agent:main:discord:channel:123456:thread:987654"; + const store = makeStore([ + [threadKey, makeEntry(now - 5 * DAY_MS)], + ["oldest", makeEntry(now - 4 * DAY_MS)], + ["old", makeEntry(now - 3 * DAY_MS)], + ["recent", makeEntry(now - 1 * DAY_MS)], + ["newest", makeEntry(now)], + ]); + + const evicted = capEntryCount(store, 3); + + expect(evicted).toBe(2); + expect(Object.keys(store)).toHaveLength(3); + expect(store[threadKey]).toBeDefined(); + expect(store.newest).toBeDefined(); + expect(store.recent).toBeDefined(); + expect(store.oldest).toBeUndefined(); + expect(store.old).toBeUndefined(); + }); +}); + +describe("isProtectedSessionMaintenanceEntry", () => { + it("does not protect synthetic sessions just because they carry group metadata", () => { + expect( + isProtectedSessionMaintenanceEntry("agent:main:subagent:worker", { + ...makeEntry(Date.now()), + chatType: "group", + }), + ).toBe(false); + expect( + isProtectedSessionMaintenanceEntry("agent:main:cron:job:run:123", { + ...makeEntry(Date.now()), + origin: { chatType: "group" }, + }), + ).toBe(false); + }); + + it("protects metadata-less Telegram topic keys without treating every :topic: id as a thread", () => { + expect( + isProtectedSessionMaintenanceEntry( + "agent:main:telegram:group:-100123:topic:77", + makeEntry(Date.now()), + ), + ).toBe(true); + expect( + isProtectedSessionMaintenanceEntry( + "agent:main:opaque:topic:om_topic_root:sender:ou_topic_user", + makeEntry(Date.now()), + ), + ).toBe(false); + }); + + it("protects metadata-less channel session keys and channel chat metadata", () => { + expect( + isProtectedSessionMaintenanceEntry("agent:main:slack:channel:C123", makeEntry(Date.now())), + ).toBe(true); + expect( + isProtectedSessionMaintenanceEntry( + "agent:main:custom:channel:room-one:with:colon", + makeEntry(Date.now()), + ), + ).toBe(true); + expect( + isProtectedSessionMaintenanceEntry("agent:main:opaque", { + ...makeEntry(Date.now()), + chatType: "channel", + }), + ).toBe(true); + }); }); describe("resolveMaintenanceConfigFromInput", () => {