From 4967bcb16bf767bf6785a3d039479fe56434b005 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 12:50:01 +0100 Subject: [PATCH] fix: route session cleanup through gateway writer --- CHANGELOG.md | 2 +- docs/cli/sessions.md | 4 + .../session-management-compaction.md | 2 +- src/commands/sessions-cleanup.test.ts | 49 +++ src/commands/sessions-cleanup.ts | 340 ++++++++++++------ src/gateway/method-scopes.ts | 1 + src/gateway/protocol/index.ts | 7 + .../protocol/schema/protocol-schemas.ts | 2 + src/gateway/protocol/schema/sessions.ts | 11 + src/gateway/protocol/schema/types.ts | 1 + src/gateway/server-methods-list.ts | 1 + src/gateway/server-methods/sessions.ts | 42 +++ src/gateway/server.sessions.store-rpc.test.ts | 19 + 13 files changed, 363 insertions(+), 118 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d017c67c547..d3a822783d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,7 @@ Docs: https://docs.openclaw.ai - Models CLI: restore `openclaw models list --provider ` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji. - Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2. - Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn. -- Sessions: route Gateway session-store writes through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel. +- Sessions: route Gateway session-store writes and CLI cleanup maintenance through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel. - Control UI/chat: show inline feedback when local slash-command dispatch is unavailable or fails unexpectedly instead of clearing the composer silently. Fixes #52105. Thanks @MooreQiao. - Memory/markdown: replace CRLF managed blocks in place and collapse duplicate marker blocks without rewriting unmanaged markdown, so Dreaming and Memory Wiki files self-heal from repeated generated sections. Fixes #75491; supersedes #75495, #75810, and #76008. Thanks @asaenokkostya-coder, @ottodeng, @everettjf, and @lrg913427-dot. - Agents/tools: return critical tool-loop circuit-breaker stops as blocked tool results instead of thrown tool failures, so models see the guardrail and stop retrying the same call. Thanks @rayraiser. diff --git a/docs/cli/sessions.md b/docs/cli/sessions.md index 8f2219dfbad..32f8cf23fe5 100644 --- a/docs/cli/sessions.md +++ b/docs/cli/sessions.md @@ -98,6 +98,10 @@ openclaw sessions cleanup --json - `--store `: run against a specific `sessions.json` file. - `--json`: print a JSON summary. With `--all-agents`, output includes one summary per store. +When a Gateway is reachable, enforcing cleanup for configured agent stores is +sent through the Gateway so it shares the same session-store writer as runtime +traffic. Use `--store ` for explicit offline repair of a store file. + `openclaw sessions cleanup --all-agents --dry-run --json`: ```json diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 520256966da..d6346b7671d 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f - `maxDiskBytes`: optional sessions-directory budget - `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`) -Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately. +Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. When a Gateway is reachable, `openclaw sessions cleanup --enforce` delegates maintenance to the Gateway so cleanup joins the same writer queue; `--store ` is the explicit offline repair path for direct file maintenance. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately. Maintenance keeps durable external conversation pointers such as group sessions and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks, diff --git a/src/commands/sessions-cleanup.test.ts b/src/commands/sessions-cleanup.test.ts index 8a095665c88..8c06c3d1316 100644 --- a/src/commands/sessions-cleanup.test.ts +++ b/src/commands/sessions-cleanup.test.ts @@ -14,6 +14,8 @@ const mocks = vi.hoisted(() => ({ capEntryCount: vi.fn(), updateSessionStore: vi.fn(), enforceSessionDiskBudget: vi.fn(), + callGateway: vi.fn(), + isGatewayTransportError: vi.fn(), })); vi.mock("../config/config.js", () => ({ @@ -37,6 +39,11 @@ vi.mock("../config/sessions.js", () => ({ enforceSessionDiskBudget: mocks.enforceSessionDiskBudget, })); +vi.mock("../gateway/call.js", () => ({ + callGateway: mocks.callGateway, + isGatewayTransportError: mocks.isGatewayTransportError, +})); + import { sessionsCleanupCommand } from "./sessions-cleanup.js"; function makeRuntime(): { runtime: RuntimeEnv; logs: string[] } { @@ -97,6 +104,8 @@ describe("sessionsCleanupCommand", () => { ); mocks.capEntryCount.mockImplementation(() => 0); mocks.updateSessionStore.mockResolvedValue(0); + mocks.callGateway.mockResolvedValue(null); + mocks.isGatewayTransportError.mockReturnValue(true); mocks.enforceSessionDiskBudget.mockResolvedValue({ totalBytesBefore: 1000, totalBytesAfter: 700, @@ -110,6 +119,9 @@ describe("sessionsCleanupCommand", () => { }); it("emits a single JSON object for non-dry runs and applies maintenance", async () => { + mocks.callGateway.mockRejectedValue( + Object.assign(new Error("closed"), { name: "GatewayTransportError" }), + ); mocks.loadSessionStore .mockReturnValueOnce({ stale: { sessionId: "stale", updatedAt: 1 }, @@ -190,6 +202,43 @@ describe("sessionsCleanupCommand", () => { ); }); + it("delegates non-store enforcing cleanup through the Gateway writer when reachable", async () => { + mocks.callGateway.mockResolvedValue({ + agentId: "main", + storePath: "/resolved/sessions.json", + mode: "enforce", + dryRun: false, + beforeCount: 3, + afterCount: 1, + missing: 0, + pruned: 2, + capped: 0, + diskBudget: null, + wouldMutate: true, + applied: true, + appliedCount: 1, + }); + + const { runtime, logs } = makeRuntime(); + await sessionsCleanupCommand( + { + json: true, + enforce: true, + }, + runtime, + ); + + expect(mocks.callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "sessions.cleanup", + params: expect.objectContaining({ enforce: true }), + requiredMethods: ["sessions.cleanup"], + }), + ); + expect(mocks.updateSessionStore).not.toHaveBeenCalled(); + expect(JSON.parse(logs[0] ?? "{}")).toEqual(expect.objectContaining({ appliedCount: 1 })); + }); + it("returns dry-run JSON without mutating the store", async () => { mocks.loadSessionStore.mockReturnValue({ stale: { sessionId: "stale", updatedAt: 1 }, diff --git a/src/commands/sessions-cleanup.ts b/src/commands/sessions-cleanup.ts index 4fac3a16010..2c2e189c409 100644 --- a/src/commands/sessions-cleanup.ts +++ b/src/commands/sessions-cleanup.ts @@ -13,9 +13,12 @@ import { type SessionMaintenanceApplyReport, } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { callGateway, isGatewayTransportError } from "../gateway/call.js"; import { type RuntimeEnv, writeRuntimeJson } from "../runtime.js"; import { isRich, theme } from "../terminal/theme.js"; +import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { + resolveSessionStoreTargets, resolveSessionStoreTargetsOrExit, type SessionStoreTarget, } from "./session-store-targets.js"; @@ -34,7 +37,7 @@ import { toSessionDisplayRows, } from "./sessions-table.js"; -type SessionsCleanupOptions = { +export type SessionsCleanupOptions = { store?: string; agent?: string; allAgents?: boolean; @@ -74,6 +77,15 @@ type SessionCleanupSummary = { appliedCount?: number; }; +export type SessionsCleanupResult = + | SessionCleanupSummary + | { + allAgents: true; + mode: "warn" | "enforce"; + dryRun: boolean; + stores: SessionCleanupSummary[]; + }; + function resolveSessionCleanupAction(params: { key: string; missingKeys: Set; @@ -244,6 +256,22 @@ async function previewStoreCleanup(params: { }; } +function serializeSessionCleanupResult(params: { + mode: "warn" | "enforce"; + dryRun: boolean; + summaries: SessionCleanupSummary[]; +}): SessionsCleanupResult { + if (params.summaries.length === 1) { + return params.summaries[0] ?? ({} as SessionCleanupSummary); + } + return { + allAgents: true, + mode: params.mode, + dryRun: params.dryRun, + stores: params.summaries, + }; +} + function renderStoreDryRunPlan(params: { cfg: OpenClawConfig; summary: SessionCleanupSummary; @@ -295,22 +323,47 @@ function renderStoreDryRunPlan(params: { } } -export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runtime: RuntimeEnv) { - const cfg = getRuntimeConfig(); - const displayDefaults = resolveSessionDisplayDefaults(cfg); +function renderAppliedSummaries(params: { + summaries: SessionCleanupSummary[]; + runtime: RuntimeEnv; +}) { + for (let i = 0; i < params.summaries.length; i += 1) { + const summary = params.summaries[i]; + if (!summary) { + continue; + } + if (i > 0) { + params.runtime.log(""); + } + if (params.summaries.length > 1) { + params.runtime.log(`Agent: ${summary.agentId}`); + } + params.runtime.log(`Session store: ${summary.storePath}`); + params.runtime.log(`Applied maintenance. Current entries: ${summary.appliedCount ?? 0}`); + } +} + +export async function runSessionsCleanup(params: { + cfg: OpenClawConfig; + opts: SessionsCleanupOptions; + targets?: SessionStoreTarget[]; +}): Promise<{ + mode: "warn" | "enforce"; + previewResults: Array<{ + summary: SessionCleanupSummary; + actionRows: SessionCleanupActionRow[]; + }>; + appliedSummaries: SessionCleanupSummary[]; +}> { + const { cfg, opts } = params; const mode = opts.enforce ? "enforce" : resolveMaintenanceConfig().mode; - const targets = resolveSessionStoreTargetsOrExit({ - cfg, - opts: { + const targets = + params.targets ?? + resolveSessionStoreTargets(cfg, { store: opts.store, agent: opts.agent, allAgents: opts.allAgents, - }, - runtime, - }); - if (!targets) { - return; - } + }); const previewResults: Array<{ summary: SessionCleanupSummary; @@ -327,18 +380,157 @@ export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runti previewResults.push(result); } + const appliedSummaries: SessionCleanupSummary[] = []; + if (!opts.dryRun) { + for (const target of targets) { + const appliedReportRef: { current: SessionMaintenanceApplyReport | null } = { + current: null, + }; + const missingApplied = await updateSessionStore( + target.storePath, + async (store) => { + if (!opts.fixMissing) { + return 0; + } + return pruneMissingTranscriptEntries({ + store, + storePath: target.storePath, + }); + }, + { + activeSessionKey: opts.activeKey, + maintenanceOverride: { + mode, + }, + onMaintenanceApplied: (report) => { + appliedReportRef.current = report; + }, + }, + ); + const afterStore = loadSessionStore(target.storePath, { skipCache: true }); + const preview = previewResults.find( + (result) => result.summary.storePath === target.storePath, + ); + const appliedReport = appliedReportRef.current; + const summary: SessionCleanupSummary = + appliedReport === null + ? { + ...(preview?.summary ?? { + agentId: target.agentId, + storePath: target.storePath, + mode, + dryRun: false, + beforeCount: 0, + afterCount: 0, + missing: 0, + pruned: 0, + capped: 0, + diskBudget: null, + wouldMutate: false, + }), + dryRun: false, + applied: true, + appliedCount: Object.keys(afterStore).length, + } + : { + agentId: target.agentId, + storePath: target.storePath, + mode: appliedReport.mode, + dryRun: false, + beforeCount: appliedReport.beforeCount, + afterCount: appliedReport.afterCount, + missing: missingApplied, + pruned: appliedReport.pruned, + capped: appliedReport.capped, + diskBudget: appliedReport.diskBudget, + wouldMutate: + missingApplied > 0 || + appliedReport.pruned > 0 || + appliedReport.capped > 0 || + (appliedReport.diskBudget?.removedEntries ?? 0) > 0 || + (appliedReport.diskBudget?.removedFiles ?? 0) > 0, + applied: true, + appliedCount: Object.keys(afterStore).length, + }; + appliedSummaries.push(summary); + } + } + + return { mode, previewResults, appliedSummaries }; +} + +async function maybeRunGatewayCleanup( + opts: SessionsCleanupOptions, +): Promise { + if (opts.store || opts.dryRun) { + return null; + } + try { + return await callGateway({ + method: "sessions.cleanup", + params: { + agent: opts.agent, + allAgents: opts.allAgents, + enforce: opts.enforce, + activeKey: opts.activeKey, + fixMissing: opts.fixMissing, + }, + mode: GATEWAY_CLIENT_MODES.CLI, + clientName: GATEWAY_CLIENT_NAMES.CLI, + requiredMethods: ["sessions.cleanup"], + }); + } catch (error) { + if (isGatewayTransportError(error)) { + return null; + } + throw error; + } +} + +export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runtime: RuntimeEnv) { + const gatewayResult = await maybeRunGatewayCleanup(opts); + if (gatewayResult) { + if (opts.json) { + writeRuntimeJson(runtime, gatewayResult); + return; + } + renderAppliedSummaries({ + summaries: "stores" in gatewayResult ? gatewayResult.stores : [gatewayResult], + runtime, + }); + return; + } + + const cfg = getRuntimeConfig(); + const displayDefaults = resolveSessionDisplayDefaults(cfg); + const targets = resolveSessionStoreTargetsOrExit({ + cfg, + opts: { + store: opts.store, + agent: opts.agent, + allAgents: opts.allAgents, + }, + runtime, + }); + if (!targets) { + return; + } + const { mode, previewResults, appliedSummaries } = await runSessionsCleanup({ + cfg, + opts, + targets, + }); + if (opts.dryRun) { if (opts.json) { - if (previewResults.length === 1) { - writeRuntimeJson(runtime, previewResults[0]?.summary ?? {}); - return; - } - writeRuntimeJson(runtime, { - allAgents: true, - mode, - dryRun: true, - stores: previewResults.map((result) => result.summary), - }); + writeRuntimeJson( + runtime, + serializeSessionCleanupResult({ + mode, + dryRun: true, + summaries: previewResults.map((result) => result.summary), + }), + ); return; } @@ -359,101 +551,17 @@ export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runti return; } - const appliedSummaries: SessionCleanupSummary[] = []; - for (const target of targets) { - const appliedReportRef: { current: SessionMaintenanceApplyReport | null } = { - current: null, - }; - const missingApplied = await updateSessionStore( - target.storePath, - async (store) => { - if (!opts.fixMissing) { - return 0; - } - return pruneMissingTranscriptEntries({ - store, - storePath: target.storePath, - }); - }, - { - activeSessionKey: opts.activeKey, - maintenanceOverride: { - mode, - }, - onMaintenanceApplied: (report) => { - appliedReportRef.current = report; - }, - }, - ); - const afterStore = loadSessionStore(target.storePath, { skipCache: true }); - const preview = previewResults.find((result) => result.summary.storePath === target.storePath); - const appliedReport = appliedReportRef.current; - const summary: SessionCleanupSummary = - appliedReport === null - ? { - ...(preview?.summary ?? { - agentId: target.agentId, - storePath: target.storePath, - mode, - dryRun: false, - beforeCount: 0, - afterCount: 0, - missing: 0, - pruned: 0, - capped: 0, - diskBudget: null, - wouldMutate: false, - }), - dryRun: false, - applied: true, - appliedCount: Object.keys(afterStore).length, - } - : { - agentId: target.agentId, - storePath: target.storePath, - mode: appliedReport.mode, - dryRun: false, - beforeCount: appliedReport.beforeCount, - afterCount: appliedReport.afterCount, - missing: missingApplied, - pruned: appliedReport.pruned, - capped: appliedReport.capped, - diskBudget: appliedReport.diskBudget, - wouldMutate: - missingApplied > 0 || - appliedReport.pruned > 0 || - appliedReport.capped > 0 || - (appliedReport.diskBudget?.removedEntries ?? 0) > 0 || - (appliedReport.diskBudget?.removedFiles ?? 0) > 0, - applied: true, - appliedCount: Object.keys(afterStore).length, - }; - appliedSummaries.push(summary); - } - if (opts.json) { - if (appliedSummaries.length === 1) { - writeRuntimeJson(runtime, appliedSummaries[0] ?? {}); - return; - } - writeRuntimeJson(runtime, { - allAgents: true, - mode, - dryRun: false, - stores: appliedSummaries, - }); + writeRuntimeJson( + runtime, + serializeSessionCleanupResult({ + mode, + dryRun: false, + summaries: appliedSummaries, + }), + ); return; } - for (let i = 0; i < appliedSummaries.length; i += 1) { - const summary = appliedSummaries[i]; - if (i > 0) { - runtime.log(""); - } - if (appliedSummaries.length > 1) { - runtime.log(`Agent: ${summary.agentId}`); - } - runtime.log(`Session store: ${summary.storePath}`); - runtime.log(`Applied maintenance. Current entries: ${summary.appliedCount ?? 0}`); - } + renderAppliedSummaries({ summaries: appliedSummaries, runtime }); } diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 78ea6c6770f..016c9866aa1 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -184,6 +184,7 @@ const METHOD_SCOPE_GROUPS: Record = { "cron.run", "sessions.patch", "sessions.pluginPatch", + "sessions.cleanup", "sessions.reset", "sessions.delete", "sessions.compact", diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 806355c30c8..262303a832e 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -256,6 +256,8 @@ import { SessionsAbortParamsSchema, type SessionsCompactParams, SessionsCompactParamsSchema, + type SessionsCleanupParams, + SessionsCleanupParamsSchema, type SessionsCompactionBranchParams, SessionsCompactionBranchParamsSchema, type SessionsCompactionGetParams, @@ -444,6 +446,9 @@ export const validateSecretsResolveResult = ajv.compile( SecretsResolveResultSchema, ); export const validateSessionsListParams = ajv.compile(SessionsListParamsSchema); +export const validateSessionsCleanupParams = ajv.compile( + SessionsCleanupParamsSchema, +); export const validateSessionsPreviewParams = ajv.compile( SessionsPreviewParamsSchema, ); @@ -693,6 +698,7 @@ export { NodePendingEnqueueParamsSchema, NodePendingEnqueueResultSchema, SessionsListParamsSchema, + SessionsCleanupParamsSchema, SessionsPreviewParamsSchema, SessionsResolveParamsSchema, SessionsCompactionListParamsSchema, @@ -918,6 +924,7 @@ export type { NodePendingEnqueueParams, NodePendingEnqueueResult, SessionsListParams, + SessionsCleanupParams, SessionsPreviewParams, SessionsResolveParams, SessionsPatchParams, diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 19a27eb3eb6..0049b1880d9 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -201,6 +201,7 @@ import { SessionsCompactionRestoreParamsSchema, SessionsCompactionRestoreResultSchema, SessionCompactionCheckpointSchema, + SessionsCleanupParamsSchema, SessionsCreateParamsSchema, SessionsDeleteParamsSchema, SessionsListParamsSchema, @@ -275,6 +276,7 @@ export const ProtocolSchemas = { SecretsResolveAssignment: SecretsResolveAssignmentSchema, SecretsResolveResult: SecretsResolveResultSchema, SessionsListParams: SessionsListParamsSchema, + SessionsCleanupParams: SessionsCleanupParamsSchema, SessionsPreviewParams: SessionsPreviewParamsSchema, SessionsResolveParams: SessionsResolveParamsSchema, SessionCompactionCheckpoint: SessionCompactionCheckpointSchema, diff --git a/src/gateway/protocol/schema/sessions.ts b/src/gateway/protocol/schema/sessions.ts index 6eb8dfd860a..20082fd09f6 100644 --- a/src/gateway/protocol/schema/sessions.ts +++ b/src/gateway/protocol/schema/sessions.ts @@ -60,6 +60,17 @@ export const SessionsListParamsSchema = Type.Object( { additionalProperties: false }, ); +export const SessionsCleanupParamsSchema = Type.Object( + { + agent: Type.Optional(NonEmptyString), + allAgents: Type.Optional(Type.Boolean()), + enforce: Type.Optional(Type.Boolean()), + activeKey: Type.Optional(NonEmptyString), + fixMissing: Type.Optional(Type.Boolean()), + }, + { additionalProperties: false }, +); + export const SessionsPreviewParamsSchema = Type.Object( { keys: Type.Array(NonEmptyString, { minItems: 1 }), diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index 153353792ad..540d378e9af 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -44,6 +44,7 @@ export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">; export type PushTestParams = SchemaType<"PushTestParams">; export type PushTestResult = SchemaType<"PushTestResult">; export type SessionsListParams = SchemaType<"SessionsListParams">; +export type SessionsCleanupParams = SchemaType<"SessionsCleanupParams">; export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">; export type SessionsResolveParams = SchemaType<"SessionsResolveParams">; export type SessionCompactionCheckpoint = SchemaType<"SessionCompactionCheckpoint">; diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index 160c84579b2..3952c76e8e8 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -105,6 +105,7 @@ const BASE_METHODS = [ "sessions.abort", "sessions.patch", "sessions.pluginPatch", + "sessions.cleanup", "sessions.reset", "sessions.delete", "sessions.compact", diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 54bfa66ba4a..89be41b2a09 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -12,6 +12,7 @@ import { import { compactEmbeddedPiSession } from "../../agents/pi-embedded.js"; import { clearSessionQueues } from "../../auto-reply/reply/queue/cleanup.js"; import { normalizeReasoningLevel, normalizeThinkLevel } from "../../auto-reply/thinking.js"; +import { runSessionsCleanup } from "../../commands/sessions-cleanup.js"; import { loadSessionStore, resolveMainSessionKey, @@ -47,6 +48,7 @@ import { ErrorCodes, errorShape, validateSessionsAbortParams, + validateSessionsCleanupParams, validateSessionsCompactParams, validateSessionsCompactionBranchParams, validateSessionsCompactionGetParams, @@ -663,6 +665,46 @@ export const sessionsHandlers: GatewayRequestHandlers = { }); respond(true, result, undefined); }, + "sessions.cleanup": async ({ params, respond, context }) => { + if (!assertValidParams(params, validateSessionsCleanupParams, "sessions.cleanup", respond)) { + return; + } + try { + const { mode, appliedSummaries } = await runSessionsCleanup({ + cfg: context.getRuntimeConfig(), + opts: { + agent: params.agent, + allAgents: params.allAgents, + enforce: params.enforce, + activeKey: params.activeKey, + fixMissing: params.fixMissing, + }, + }); + const result = + appliedSummaries.length === 1 + ? (appliedSummaries[0] ?? {}) + : { + allAgents: true, + mode, + dryRun: false, + stores: appliedSummaries, + }; + respond(true, result, undefined); + for (const summary of appliedSummaries) { + emitSessionsChanged(context, { + reason: "cleanup", + sessionKey: undefined, + }); + if (summary.wouldMutate) { + context.logGateway.debug( + `sessions.cleanup applied ${summary.storePath}: ${summary.beforeCount} -> ${summary.afterCount}`, + ); + } + } + } catch (error) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, formatErrorMessage(error))); + } + }, "sessions.subscribe": ({ client, context, respond }) => { const connId = client?.connId?.trim(); if (connId) { diff --git a/src/gateway/server.sessions.store-rpc.test.ts b/src/gateway/server.sessions.store-rpc.test.ts index 1cc7728dac9..621b6678944 100644 --- a/src/gateway/server.sessions.store-rpc.test.ts +++ b/src/gateway/server.sessions.store-rpc.test.ts @@ -67,6 +67,7 @@ test("lists and patches session store via sessions.* RPC", async () => { expect.arrayContaining([ "sessions.list", "sessions.preview", + "sessions.cleanup", "sessions.patch", "sessions.reset", "sessions.delete", @@ -78,6 +79,7 @@ test("lists and patches session store via sessions.* RPC", async () => { const directContext = { broadcastToConnIds: vi.fn(), getSessionEventSubscriberConnIds: () => new Set(), + logGateway: { debug: vi.fn() }, loadGatewayModelCatalog: async () => piSdkMock.models, getRuntimeConfig: getRuntimeConfig, } as never; @@ -305,6 +307,23 @@ test("lists and patches session store via sessions.* RPC", async () => { }); expect(spawnedPatchedInvalidKey.ok).toBe(false); + const cleaned = await directSessionReq<{ + applied: true; + missing: number; + appliedCount: number; + }>("sessions.cleanup", { + enforce: true, + fixMissing: true, + }); + expect(cleaned.ok).toBe(true); + expect(cleaned.payload?.missing).toBeGreaterThanOrEqual(1); + const listAfterCleanup = await directSessionReq<{ + sessions: Array<{ key: string }>; + }>("sessions.list", {}); + expect(listAfterCleanup.payload?.sessions.some((s) => s.key === "agent:main:subagent:one")).toBe( + false, + ); + piSdkMock.enabled = true; piSdkMock.models = [{ id: "gpt-test-a", name: "A", provider: "openai" }]; const modelPatched = await directSessionReq<{