mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix(gateway): reduce session-store clone memory growth
## Summary - Addresses the remaining Gateway RSS/session-accumulation path tracked by #54155. - Narrows the fix to the structuredClone/session-store cache memory growth described in #45438. - Preserves prior report context from #57699, #62717, #66886, #69977, and #70717 as validation evidence. ## Validation - pnpm -s vitest run src/config/sessions/store.pruning.test.ts src/config/sessions/store.pruning.integration.test.ts src/gateway/sessions-resolve-store.test.ts - pnpm check:changed ## Credit Thanks @the-lobsternaut for #54155 and @markus-lassfolk plus the #45438 commenters for isolating the structuredClone/native-memory behavior. ProjectClownfish replacement details: - Cluster: ghcrawl-156648-autonomous-smoke - Source PRs: none - Credit: Credit #54155 reporter @the-lobsternaut for the multi-day Gateway RSS/session-accumulation report.; Credit #45438 reporter @markus-lassfolk and commenters for isolating the structuredClone/session-store native-memory path.; Preserve prior closed-report context from #57699, #62717, #66886, #69977, and #70717 in the PR body as reproduction evidence, not as new close targets. - Validation: pnpm -s vitest run src/config/sessions/store.pruning.test.ts src/config/sessions/store.pruning.integration.test.ts src/gateway/sessions-resolve-store.test.ts; pnpm check:changed
This commit is contained in:
@@ -199,6 +199,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- CLI/channel-setup: auto-skip the redundant "Install \<plugin\>?" confirmation when only one install source (npm or local) exists, show `download from <npm-spec>` hints for installable catalog channels in the picker, and suppress misleading npm hints for already-bundled channels. Fixes #73419. Thanks @sliverp.
|
||||
- BlueBubbles: tighten DM-vs-group routing across the outbound session route (`chat_guid:iMessage;-;...` DMs no longer classified as groups), reaction handling (drop group reactions that arrive without any chat identifier instead of synthesizing a `"group"` literal peerId), inbound `chatGuid` fallback (no longer fall back to the sender's DM chatGuid when resolving a group whose webhook omits chatGuid+chatId+chatIdentifier), and short message id resolution (carry caller chat context so a numeric short id reused after a long group conversation cannot silently resolve to a message in a different chat, with the same cross-chat guard applied to full GUIDs so retries cannot bypass it). Thanks @zqchris.
|
||||
- Gateway/sessions: clone cached session stores through the persisted JSON shape instead of `structuredClone`, reducing native-memory growth on the remaining #54155 Gateway RSS/session-accumulation path while keeping #54155 as the broader tracker and carrying forward the #45438 session-cache hypothesis. Thanks @vincentkoc and the #45438 reporters/commenters.
|
||||
- Agents/approvals: fail restart-interrupted sessions whose transcript tail is still `approval-pending` instead of replaying stale exec approval IDs into the new Gateway process after restart. Fixes #65486. Thanks @mjmai20682068-create.
|
||||
- CLI/Gateway: use method-specific least-privilege scopes for classified CLI Gateway calls while preserving legacy broad scopes for unclassified plugin methods, so read-only commands no longer create admin/write/pairing scope-upgrade prompts. Fixes #68634. Thanks @nightmusher.
|
||||
- Gateway/sessions: align `chat.history` and `sessions.list` thinking defaults with owning-agent and catalog-aware resolution so Control UI session defaults match backend runtime state. (#63418) Thanks @jpreagan.
|
||||
|
||||
@@ -2,6 +2,7 @@ import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createSuiteTempRootTracker } from "../test-helpers/temp-dir.js";
|
||||
import { readSessionStoreCache, writeSessionStoreCache } from "./sessions/store-cache.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
loadSessionStore,
|
||||
@@ -107,6 +108,121 @@ describe("Session Store Cache", () => {
|
||||
expect(loaded2["session:1"].skillsSnapshot?.skills?.[0]?.name).toBe("alpha");
|
||||
});
|
||||
|
||||
it("does not cache pre-migration or pre-normalization disk JSON", () => {
|
||||
fs.writeFileSync(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
"session:1": {
|
||||
sessionId: "id-1",
|
||||
updatedAt: Date.now(),
|
||||
provider: "telegram",
|
||||
room: "room-1",
|
||||
modelProvider: " openai ",
|
||||
model: " gpt-5.4 ",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const loaded1 = loadSessionStore(storePath);
|
||||
const entry1 = loaded1["session:1"] as SessionEntry & { provider?: string; room?: string };
|
||||
expect(entry1.channel).toBe("telegram");
|
||||
expect(entry1.groupChannel).toBe("room-1");
|
||||
expect(entry1.provider).toBeUndefined();
|
||||
expect(entry1.room).toBeUndefined();
|
||||
expect(entry1.modelProvider).toBe("openai");
|
||||
expect(entry1.model).toBe("gpt-5.4");
|
||||
|
||||
const loaded2 = loadSessionStore(storePath);
|
||||
const entry2 = loaded2["session:1"] as SessionEntry & { provider?: string; room?: string };
|
||||
expect(entry2.channel).toBe("telegram");
|
||||
expect(entry2.groupChannel).toBe("room-1");
|
||||
expect(entry2.provider).toBeUndefined();
|
||||
expect(entry2.room).toBeUndefined();
|
||||
expect(entry2.modelProvider).toBe("openai");
|
||||
expect(entry2.model).toBe("gpt-5.4");
|
||||
});
|
||||
|
||||
it("isolates cached session stores without structuredClone", async () => {
|
||||
const structuredCloneSpy = vi.spyOn(globalThis, "structuredClone");
|
||||
const testStore = createSingleSessionStore(
|
||||
createSessionEntry({
|
||||
origin: { provider: "openai" },
|
||||
skillsSnapshot: {
|
||||
prompt: "skills",
|
||||
skills: [{ name: "alpha" }],
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await saveSessionStore(storePath, testStore);
|
||||
|
||||
const loaded1 = loadSessionStore(storePath);
|
||||
loaded1["session:1"].origin = { provider: "mutated" };
|
||||
if (loaded1["session:1"].skillsSnapshot?.skills?.length) {
|
||||
loaded1["session:1"].skillsSnapshot.skills[0].name = "mutated";
|
||||
}
|
||||
|
||||
const loaded2 = loadSessionStore(storePath);
|
||||
expect(loaded2["session:1"].origin?.provider).toBe("openai");
|
||||
expect(loaded2["session:1"].skillsSnapshot?.skills?.[0]?.name).toBe("alpha");
|
||||
expect(structuredCloneSpy).not.toHaveBeenCalled();
|
||||
|
||||
structuredCloneSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("does not parse serialized stores when writing the cache", () => {
|
||||
const testStore = createSingleSessionStore(
|
||||
createSessionEntry({
|
||||
origin: { provider: "openai" },
|
||||
}),
|
||||
);
|
||||
const serialized = JSON.stringify(testStore);
|
||||
const parseSpy = vi.spyOn(JSON, "parse");
|
||||
|
||||
writeSessionStoreCache({ storePath, store: testStore, serialized });
|
||||
|
||||
expect(parseSpy).not.toHaveBeenCalled();
|
||||
|
||||
testStore["session:1"].origin = { provider: "mutated" };
|
||||
const cached = readSessionStoreCache({ storePath });
|
||||
|
||||
expect(cached?.["session:1"].origin?.provider).toBe("openai");
|
||||
expect(parseSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
parseSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("clones disk-loaded stores from the raw serialized JSON", () => {
|
||||
const testStore = createSingleSessionStore(
|
||||
createSessionEntry({
|
||||
origin: { provider: "openai" },
|
||||
skillsSnapshot: {
|
||||
prompt: "skills",
|
||||
skills: [{ name: "alpha" }],
|
||||
},
|
||||
}),
|
||||
);
|
||||
const serialized = JSON.stringify(testStore);
|
||||
fs.writeFileSync(storePath, serialized);
|
||||
|
||||
const stringifySpy = vi.spyOn(JSON, "stringify");
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
|
||||
expect(loaded).toEqual(testStore);
|
||||
expect(stringifySpy).not.toHaveBeenCalled();
|
||||
|
||||
loaded["session:1"].origin = { provider: "mutated" };
|
||||
if (loaded["session:1"].skillsSnapshot?.skills?.length) {
|
||||
loaded["session:1"].skillsSnapshot.skills[0].name = "mutated";
|
||||
}
|
||||
|
||||
const reloaded = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(reloaded["session:1"].origin?.provider).toBe("openai");
|
||||
expect(reloaded["session:1"].skillsSnapshot?.skills?.[0]?.name).toBe("alpha");
|
||||
|
||||
stringifySpy.mockRestore();
|
||||
});
|
||||
|
||||
it("should refresh cache when store file changes on disk", async () => {
|
||||
const testStore = createSingleSessionStore();
|
||||
|
||||
|
||||
@@ -15,6 +15,13 @@ const SESSION_STORE_CACHE = createExpiringMapCache<string, SessionStoreCacheEntr
|
||||
});
|
||||
const SESSION_STORE_SERIALIZED_CACHE = new Map<string, string>();
|
||||
|
||||
export function cloneSessionStoreRecord(
|
||||
store: Record<string, SessionEntry>,
|
||||
serialized?: string,
|
||||
): Record<string, SessionEntry> {
|
||||
return JSON.parse(serialized ?? JSON.stringify(store)) as Record<string, SessionEntry>;
|
||||
}
|
||||
|
||||
export function getSessionStoreTtl(): number {
|
||||
return resolveCacheTtlMs({
|
||||
envValue: process.env.OPENCLAW_SESSION_CACHE_TTL_MS,
|
||||
@@ -65,7 +72,7 @@ export function readSessionStoreCache(params: {
|
||||
invalidateSessionStoreCache(params.storePath);
|
||||
return null;
|
||||
}
|
||||
return structuredClone(cached.store);
|
||||
return cloneSessionStoreRecord(cached.store, cached.serialized);
|
||||
}
|
||||
|
||||
export function writeSessionStoreCache(params: {
|
||||
@@ -76,7 +83,7 @@ export function writeSessionStoreCache(params: {
|
||||
serialized?: string;
|
||||
}): void {
|
||||
SESSION_STORE_CACHE.set(params.storePath, {
|
||||
store: structuredClone(params.store),
|
||||
store: params.serialized === undefined ? cloneSessionStoreRecord(params.store) : params.store,
|
||||
mtimeMs: params.mtimeMs,
|
||||
sizeBytes: params.sizeBytes,
|
||||
serialized: params.serialized,
|
||||
|
||||
@@ -3,6 +3,7 @@ import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.shared.js";
|
||||
import { getFileStatSnapshot } from "../cache-utils.js";
|
||||
import {
|
||||
cloneSessionStoreRecord,
|
||||
isSessionStoreCacheEnabled,
|
||||
readSessionStoreCache,
|
||||
setSerializedSessionStore,
|
||||
@@ -63,7 +64,8 @@ function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
|
||||
};
|
||||
}
|
||||
|
||||
export function normalizeSessionStore(store: Record<string, SessionEntry>): void {
|
||||
export function normalizeSessionStore(store: Record<string, SessionEntry>): boolean {
|
||||
let changed = false;
|
||||
for (const [key, entry] of Object.entries(store)) {
|
||||
if (!entry) {
|
||||
continue;
|
||||
@@ -71,8 +73,10 @@ export function normalizeSessionStore(store: Record<string, SessionEntry>): void
|
||||
const normalized = normalizeSessionEntryDelivery(normalizeSessionRuntimeModelFields(entry));
|
||||
if (normalized !== entry) {
|
||||
store[key] = normalized;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
export function loadSessionStore(
|
||||
@@ -122,14 +126,11 @@ export function loadSessionStore(
|
||||
}
|
||||
}
|
||||
|
||||
if (serializedFromDisk !== undefined) {
|
||||
setSerializedSessionStore(storePath, serializedFromDisk);
|
||||
} else {
|
||||
setSerializedSessionStore(storePath, undefined);
|
||||
const migrated = applySessionStoreMigrations(store);
|
||||
const normalized = normalizeSessionStore(store);
|
||||
if (migrated || normalized) {
|
||||
serializedFromDisk = undefined;
|
||||
}
|
||||
|
||||
applySessionStoreMigrations(store);
|
||||
normalizeSessionStore(store);
|
||||
const maintenance = opts.maintenanceConfig ?? resolveMaintenanceConfig();
|
||||
const beforeCount = Object.keys(store).length;
|
||||
if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) {
|
||||
@@ -144,7 +145,6 @@ export function loadSessionStore(
|
||||
const afterCount = Object.keys(store).length;
|
||||
if (pruned > 0 || capped > 0) {
|
||||
serializedFromDisk = undefined;
|
||||
setSerializedSessionStore(storePath, undefined);
|
||||
log.info("applied load-time maintenance to oversized session store", {
|
||||
storePath,
|
||||
before: beforeCount,
|
||||
@@ -156,6 +156,8 @@ export function loadSessionStore(
|
||||
}
|
||||
}
|
||||
|
||||
setSerializedSessionStore(storePath, serializedFromDisk);
|
||||
|
||||
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
|
||||
writeSessionStoreCache({
|
||||
storePath,
|
||||
@@ -166,5 +168,5 @@ export function loadSessionStore(
|
||||
});
|
||||
}
|
||||
|
||||
return opts.clone === false ? store : structuredClone(store);
|
||||
return opts.clone === false ? store : cloneSessionStoreRecord(store, serializedFromDisk);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
export function applySessionStoreMigrations(store: Record<string, SessionEntry>): void {
|
||||
export function applySessionStoreMigrations(store: Record<string, SessionEntry>): boolean {
|
||||
let changed = false;
|
||||
// Best-effort migration: message provider → channel naming.
|
||||
for (const entry of Object.values(store)) {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
@@ -10,18 +11,23 @@ export function applySessionStoreMigrations(store: Record<string, SessionEntry>)
|
||||
if (typeof rec.channel !== "string" && typeof rec.provider === "string") {
|
||||
rec.channel = rec.provider;
|
||||
delete rec.provider;
|
||||
changed = true;
|
||||
}
|
||||
if (typeof rec.lastChannel !== "string" && typeof rec.lastProvider === "string") {
|
||||
rec.lastChannel = rec.lastProvider;
|
||||
delete rec.lastProvider;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
// Best-effort migration: legacy `room` field → `groupChannel` (keep value, prune old key).
|
||||
if (typeof rec.groupChannel !== "string" && typeof rec.room === "string") {
|
||||
rec.groupChannel = rec.room;
|
||||
delete rec.room;
|
||||
changed = true;
|
||||
} else if ("room" in rec) {
|
||||
delete rec.room;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user