mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix: route session cleanup through gateway writer
This commit is contained in:
@@ -38,7 +38,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Models CLI: restore `openclaw models list --provider <id>` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji.
|
||||
- Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2.
|
||||
- Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn.
|
||||
- Sessions: route Gateway session-store writes through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel.
|
||||
- Sessions: route Gateway session-store writes and CLI cleanup maintenance through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel.
|
||||
- Control UI/chat: show inline feedback when local slash-command dispatch is unavailable or fails unexpectedly instead of clearing the composer silently. Fixes #52105. Thanks @MooreQiao.
|
||||
- Memory/markdown: replace CRLF managed blocks in place and collapse duplicate marker blocks without rewriting unmanaged markdown, so Dreaming and Memory Wiki files self-heal from repeated generated sections. Fixes #75491; supersedes #75495, #75810, and #76008. Thanks @asaenokkostya-coder, @ottodeng, @everettjf, and @lrg913427-dot.
|
||||
- Agents/tools: return critical tool-loop circuit-breaker stops as blocked tool results instead of thrown tool failures, so models see the guardrail and stop retrying the same call. Thanks @rayraiser.
|
||||
|
||||
@@ -98,6 +98,10 @@ openclaw sessions cleanup --json
|
||||
- `--store <path>`: run against a specific `sessions.json` file.
|
||||
- `--json`: print a JSON summary. With `--all-agents`, output includes one summary per store.
|
||||
|
||||
When a Gateway is reachable, enforcing cleanup for configured agent stores is
|
||||
sent through the Gateway so it shares the same session-store writer as runtime
|
||||
traffic. Use `--store <path>` for explicit offline repair of a store file.
|
||||
|
||||
`openclaw sessions cleanup --all-agents --dry-run --json`:
|
||||
|
||||
```json
|
||||
|
||||
@@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
|
||||
- `maxDiskBytes`: optional sessions-directory budget
|
||||
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
|
||||
|
||||
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
|
||||
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. When a Gateway is reachable, `openclaw sessions cleanup --enforce` delegates maintenance to the Gateway so cleanup joins the same writer queue; `--store <path>` is the explicit offline repair path for direct file maintenance. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
|
||||
|
||||
Maintenance keeps durable external conversation pointers such as group sessions
|
||||
and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks,
|
||||
|
||||
@@ -14,6 +14,8 @@ const mocks = vi.hoisted(() => ({
|
||||
capEntryCount: vi.fn(),
|
||||
updateSessionStore: vi.fn(),
|
||||
enforceSessionDiskBudget: vi.fn(),
|
||||
callGateway: vi.fn(),
|
||||
isGatewayTransportError: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
@@ -37,6 +39,11 @@ vi.mock("../config/sessions.js", () => ({
|
||||
enforceSessionDiskBudget: mocks.enforceSessionDiskBudget,
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: mocks.callGateway,
|
||||
isGatewayTransportError: mocks.isGatewayTransportError,
|
||||
}));
|
||||
|
||||
import { sessionsCleanupCommand } from "./sessions-cleanup.js";
|
||||
|
||||
function makeRuntime(): { runtime: RuntimeEnv; logs: string[] } {
|
||||
@@ -97,6 +104,8 @@ describe("sessionsCleanupCommand", () => {
|
||||
);
|
||||
mocks.capEntryCount.mockImplementation(() => 0);
|
||||
mocks.updateSessionStore.mockResolvedValue(0);
|
||||
mocks.callGateway.mockResolvedValue(null);
|
||||
mocks.isGatewayTransportError.mockReturnValue(true);
|
||||
mocks.enforceSessionDiskBudget.mockResolvedValue({
|
||||
totalBytesBefore: 1000,
|
||||
totalBytesAfter: 700,
|
||||
@@ -110,6 +119,9 @@ describe("sessionsCleanupCommand", () => {
|
||||
});
|
||||
|
||||
it("emits a single JSON object for non-dry runs and applies maintenance", async () => {
|
||||
mocks.callGateway.mockRejectedValue(
|
||||
Object.assign(new Error("closed"), { name: "GatewayTransportError" }),
|
||||
);
|
||||
mocks.loadSessionStore
|
||||
.mockReturnValueOnce({
|
||||
stale: { sessionId: "stale", updatedAt: 1 },
|
||||
@@ -190,6 +202,43 @@ describe("sessionsCleanupCommand", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("delegates non-store enforcing cleanup through the Gateway writer when reachable", async () => {
|
||||
mocks.callGateway.mockResolvedValue({
|
||||
agentId: "main",
|
||||
storePath: "/resolved/sessions.json",
|
||||
mode: "enforce",
|
||||
dryRun: false,
|
||||
beforeCount: 3,
|
||||
afterCount: 1,
|
||||
missing: 0,
|
||||
pruned: 2,
|
||||
capped: 0,
|
||||
diskBudget: null,
|
||||
wouldMutate: true,
|
||||
applied: true,
|
||||
appliedCount: 1,
|
||||
});
|
||||
|
||||
const { runtime, logs } = makeRuntime();
|
||||
await sessionsCleanupCommand(
|
||||
{
|
||||
json: true,
|
||||
enforce: true,
|
||||
},
|
||||
runtime,
|
||||
);
|
||||
|
||||
expect(mocks.callGateway).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
method: "sessions.cleanup",
|
||||
params: expect.objectContaining({ enforce: true }),
|
||||
requiredMethods: ["sessions.cleanup"],
|
||||
}),
|
||||
);
|
||||
expect(mocks.updateSessionStore).not.toHaveBeenCalled();
|
||||
expect(JSON.parse(logs[0] ?? "{}")).toEqual(expect.objectContaining({ appliedCount: 1 }));
|
||||
});
|
||||
|
||||
it("returns dry-run JSON without mutating the store", async () => {
|
||||
mocks.loadSessionStore.mockReturnValue({
|
||||
stale: { sessionId: "stale", updatedAt: 1 },
|
||||
|
||||
@@ -13,9 +13,12 @@ import {
|
||||
type SessionMaintenanceApplyReport,
|
||||
} from "../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { callGateway, isGatewayTransportError } from "../gateway/call.js";
|
||||
import { type RuntimeEnv, writeRuntimeJson } from "../runtime.js";
|
||||
import { isRich, theme } from "../terminal/theme.js";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
||||
import {
|
||||
resolveSessionStoreTargets,
|
||||
resolveSessionStoreTargetsOrExit,
|
||||
type SessionStoreTarget,
|
||||
} from "./session-store-targets.js";
|
||||
@@ -34,7 +37,7 @@ import {
|
||||
toSessionDisplayRows,
|
||||
} from "./sessions-table.js";
|
||||
|
||||
type SessionsCleanupOptions = {
|
||||
export type SessionsCleanupOptions = {
|
||||
store?: string;
|
||||
agent?: string;
|
||||
allAgents?: boolean;
|
||||
@@ -74,6 +77,15 @@ type SessionCleanupSummary = {
|
||||
appliedCount?: number;
|
||||
};
|
||||
|
||||
export type SessionsCleanupResult =
|
||||
| SessionCleanupSummary
|
||||
| {
|
||||
allAgents: true;
|
||||
mode: "warn" | "enforce";
|
||||
dryRun: boolean;
|
||||
stores: SessionCleanupSummary[];
|
||||
};
|
||||
|
||||
function resolveSessionCleanupAction(params: {
|
||||
key: string;
|
||||
missingKeys: Set<string>;
|
||||
@@ -244,6 +256,22 @@ async function previewStoreCleanup(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function serializeSessionCleanupResult(params: {
|
||||
mode: "warn" | "enforce";
|
||||
dryRun: boolean;
|
||||
summaries: SessionCleanupSummary[];
|
||||
}): SessionsCleanupResult {
|
||||
if (params.summaries.length === 1) {
|
||||
return params.summaries[0] ?? ({} as SessionCleanupSummary);
|
||||
}
|
||||
return {
|
||||
allAgents: true,
|
||||
mode: params.mode,
|
||||
dryRun: params.dryRun,
|
||||
stores: params.summaries,
|
||||
};
|
||||
}
|
||||
|
||||
function renderStoreDryRunPlan(params: {
|
||||
cfg: OpenClawConfig;
|
||||
summary: SessionCleanupSummary;
|
||||
@@ -295,22 +323,47 @@ function renderStoreDryRunPlan(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runtime: RuntimeEnv) {
|
||||
const cfg = getRuntimeConfig();
|
||||
const displayDefaults = resolveSessionDisplayDefaults(cfg);
|
||||
function renderAppliedSummaries(params: {
|
||||
summaries: SessionCleanupSummary[];
|
||||
runtime: RuntimeEnv;
|
||||
}) {
|
||||
for (let i = 0; i < params.summaries.length; i += 1) {
|
||||
const summary = params.summaries[i];
|
||||
if (!summary) {
|
||||
continue;
|
||||
}
|
||||
if (i > 0) {
|
||||
params.runtime.log("");
|
||||
}
|
||||
if (params.summaries.length > 1) {
|
||||
params.runtime.log(`Agent: ${summary.agentId}`);
|
||||
}
|
||||
params.runtime.log(`Session store: ${summary.storePath}`);
|
||||
params.runtime.log(`Applied maintenance. Current entries: ${summary.appliedCount ?? 0}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function runSessionsCleanup(params: {
|
||||
cfg: OpenClawConfig;
|
||||
opts: SessionsCleanupOptions;
|
||||
targets?: SessionStoreTarget[];
|
||||
}): Promise<{
|
||||
mode: "warn" | "enforce";
|
||||
previewResults: Array<{
|
||||
summary: SessionCleanupSummary;
|
||||
actionRows: SessionCleanupActionRow[];
|
||||
}>;
|
||||
appliedSummaries: SessionCleanupSummary[];
|
||||
}> {
|
||||
const { cfg, opts } = params;
|
||||
const mode = opts.enforce ? "enforce" : resolveMaintenanceConfig().mode;
|
||||
const targets = resolveSessionStoreTargetsOrExit({
|
||||
cfg,
|
||||
opts: {
|
||||
const targets =
|
||||
params.targets ??
|
||||
resolveSessionStoreTargets(cfg, {
|
||||
store: opts.store,
|
||||
agent: opts.agent,
|
||||
allAgents: opts.allAgents,
|
||||
},
|
||||
runtime,
|
||||
});
|
||||
if (!targets) {
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
const previewResults: Array<{
|
||||
summary: SessionCleanupSummary;
|
||||
@@ -327,18 +380,157 @@ export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runti
|
||||
previewResults.push(result);
|
||||
}
|
||||
|
||||
const appliedSummaries: SessionCleanupSummary[] = [];
|
||||
if (!opts.dryRun) {
|
||||
for (const target of targets) {
|
||||
const appliedReportRef: { current: SessionMaintenanceApplyReport | null } = {
|
||||
current: null,
|
||||
};
|
||||
const missingApplied = await updateSessionStore(
|
||||
target.storePath,
|
||||
async (store) => {
|
||||
if (!opts.fixMissing) {
|
||||
return 0;
|
||||
}
|
||||
return pruneMissingTranscriptEntries({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
});
|
||||
},
|
||||
{
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenanceOverride: {
|
||||
mode,
|
||||
},
|
||||
onMaintenanceApplied: (report) => {
|
||||
appliedReportRef.current = report;
|
||||
},
|
||||
},
|
||||
);
|
||||
const afterStore = loadSessionStore(target.storePath, { skipCache: true });
|
||||
const preview = previewResults.find(
|
||||
(result) => result.summary.storePath === target.storePath,
|
||||
);
|
||||
const appliedReport = appliedReportRef.current;
|
||||
const summary: SessionCleanupSummary =
|
||||
appliedReport === null
|
||||
? {
|
||||
...(preview?.summary ?? {
|
||||
agentId: target.agentId,
|
||||
storePath: target.storePath,
|
||||
mode,
|
||||
dryRun: false,
|
||||
beforeCount: 0,
|
||||
afterCount: 0,
|
||||
missing: 0,
|
||||
pruned: 0,
|
||||
capped: 0,
|
||||
diskBudget: null,
|
||||
wouldMutate: false,
|
||||
}),
|
||||
dryRun: false,
|
||||
applied: true,
|
||||
appliedCount: Object.keys(afterStore).length,
|
||||
}
|
||||
: {
|
||||
agentId: target.agentId,
|
||||
storePath: target.storePath,
|
||||
mode: appliedReport.mode,
|
||||
dryRun: false,
|
||||
beforeCount: appliedReport.beforeCount,
|
||||
afterCount: appliedReport.afterCount,
|
||||
missing: missingApplied,
|
||||
pruned: appliedReport.pruned,
|
||||
capped: appliedReport.capped,
|
||||
diskBudget: appliedReport.diskBudget,
|
||||
wouldMutate:
|
||||
missingApplied > 0 ||
|
||||
appliedReport.pruned > 0 ||
|
||||
appliedReport.capped > 0 ||
|
||||
(appliedReport.diskBudget?.removedEntries ?? 0) > 0 ||
|
||||
(appliedReport.diskBudget?.removedFiles ?? 0) > 0,
|
||||
applied: true,
|
||||
appliedCount: Object.keys(afterStore).length,
|
||||
};
|
||||
appliedSummaries.push(summary);
|
||||
}
|
||||
}
|
||||
|
||||
return { mode, previewResults, appliedSummaries };
|
||||
}
|
||||
|
||||
async function maybeRunGatewayCleanup(
|
||||
opts: SessionsCleanupOptions,
|
||||
): Promise<SessionsCleanupResult | null> {
|
||||
if (opts.store || opts.dryRun) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return await callGateway<SessionsCleanupResult>({
|
||||
method: "sessions.cleanup",
|
||||
params: {
|
||||
agent: opts.agent,
|
||||
allAgents: opts.allAgents,
|
||||
enforce: opts.enforce,
|
||||
activeKey: opts.activeKey,
|
||||
fixMissing: opts.fixMissing,
|
||||
},
|
||||
mode: GATEWAY_CLIENT_MODES.CLI,
|
||||
clientName: GATEWAY_CLIENT_NAMES.CLI,
|
||||
requiredMethods: ["sessions.cleanup"],
|
||||
});
|
||||
} catch (error) {
|
||||
if (isGatewayTransportError(error)) {
|
||||
return null;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runtime: RuntimeEnv) {
|
||||
const gatewayResult = await maybeRunGatewayCleanup(opts);
|
||||
if (gatewayResult) {
|
||||
if (opts.json) {
|
||||
writeRuntimeJson(runtime, gatewayResult);
|
||||
return;
|
||||
}
|
||||
renderAppliedSummaries({
|
||||
summaries: "stores" in gatewayResult ? gatewayResult.stores : [gatewayResult],
|
||||
runtime,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const cfg = getRuntimeConfig();
|
||||
const displayDefaults = resolveSessionDisplayDefaults(cfg);
|
||||
const targets = resolveSessionStoreTargetsOrExit({
|
||||
cfg,
|
||||
opts: {
|
||||
store: opts.store,
|
||||
agent: opts.agent,
|
||||
allAgents: opts.allAgents,
|
||||
},
|
||||
runtime,
|
||||
});
|
||||
if (!targets) {
|
||||
return;
|
||||
}
|
||||
const { mode, previewResults, appliedSummaries } = await runSessionsCleanup({
|
||||
cfg,
|
||||
opts,
|
||||
targets,
|
||||
});
|
||||
|
||||
if (opts.dryRun) {
|
||||
if (opts.json) {
|
||||
if (previewResults.length === 1) {
|
||||
writeRuntimeJson(runtime, previewResults[0]?.summary ?? {});
|
||||
return;
|
||||
}
|
||||
writeRuntimeJson(runtime, {
|
||||
allAgents: true,
|
||||
mode,
|
||||
dryRun: true,
|
||||
stores: previewResults.map((result) => result.summary),
|
||||
});
|
||||
writeRuntimeJson(
|
||||
runtime,
|
||||
serializeSessionCleanupResult({
|
||||
mode,
|
||||
dryRun: true,
|
||||
summaries: previewResults.map((result) => result.summary),
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -359,101 +551,17 @@ export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runti
|
||||
return;
|
||||
}
|
||||
|
||||
const appliedSummaries: SessionCleanupSummary[] = [];
|
||||
for (const target of targets) {
|
||||
const appliedReportRef: { current: SessionMaintenanceApplyReport | null } = {
|
||||
current: null,
|
||||
};
|
||||
const missingApplied = await updateSessionStore(
|
||||
target.storePath,
|
||||
async (store) => {
|
||||
if (!opts.fixMissing) {
|
||||
return 0;
|
||||
}
|
||||
return pruneMissingTranscriptEntries({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
});
|
||||
},
|
||||
{
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenanceOverride: {
|
||||
mode,
|
||||
},
|
||||
onMaintenanceApplied: (report) => {
|
||||
appliedReportRef.current = report;
|
||||
},
|
||||
},
|
||||
);
|
||||
const afterStore = loadSessionStore(target.storePath, { skipCache: true });
|
||||
const preview = previewResults.find((result) => result.summary.storePath === target.storePath);
|
||||
const appliedReport = appliedReportRef.current;
|
||||
const summary: SessionCleanupSummary =
|
||||
appliedReport === null
|
||||
? {
|
||||
...(preview?.summary ?? {
|
||||
agentId: target.agentId,
|
||||
storePath: target.storePath,
|
||||
mode,
|
||||
dryRun: false,
|
||||
beforeCount: 0,
|
||||
afterCount: 0,
|
||||
missing: 0,
|
||||
pruned: 0,
|
||||
capped: 0,
|
||||
diskBudget: null,
|
||||
wouldMutate: false,
|
||||
}),
|
||||
dryRun: false,
|
||||
applied: true,
|
||||
appliedCount: Object.keys(afterStore).length,
|
||||
}
|
||||
: {
|
||||
agentId: target.agentId,
|
||||
storePath: target.storePath,
|
||||
mode: appliedReport.mode,
|
||||
dryRun: false,
|
||||
beforeCount: appliedReport.beforeCount,
|
||||
afterCount: appliedReport.afterCount,
|
||||
missing: missingApplied,
|
||||
pruned: appliedReport.pruned,
|
||||
capped: appliedReport.capped,
|
||||
diskBudget: appliedReport.diskBudget,
|
||||
wouldMutate:
|
||||
missingApplied > 0 ||
|
||||
appliedReport.pruned > 0 ||
|
||||
appliedReport.capped > 0 ||
|
||||
(appliedReport.diskBudget?.removedEntries ?? 0) > 0 ||
|
||||
(appliedReport.diskBudget?.removedFiles ?? 0) > 0,
|
||||
applied: true,
|
||||
appliedCount: Object.keys(afterStore).length,
|
||||
};
|
||||
appliedSummaries.push(summary);
|
||||
}
|
||||
|
||||
if (opts.json) {
|
||||
if (appliedSummaries.length === 1) {
|
||||
writeRuntimeJson(runtime, appliedSummaries[0] ?? {});
|
||||
return;
|
||||
}
|
||||
writeRuntimeJson(runtime, {
|
||||
allAgents: true,
|
||||
mode,
|
||||
dryRun: false,
|
||||
stores: appliedSummaries,
|
||||
});
|
||||
writeRuntimeJson(
|
||||
runtime,
|
||||
serializeSessionCleanupResult({
|
||||
mode,
|
||||
dryRun: false,
|
||||
summaries: appliedSummaries,
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for (let i = 0; i < appliedSummaries.length; i += 1) {
|
||||
const summary = appliedSummaries[i];
|
||||
if (i > 0) {
|
||||
runtime.log("");
|
||||
}
|
||||
if (appliedSummaries.length > 1) {
|
||||
runtime.log(`Agent: ${summary.agentId}`);
|
||||
}
|
||||
runtime.log(`Session store: ${summary.storePath}`);
|
||||
runtime.log(`Applied maintenance. Current entries: ${summary.appliedCount ?? 0}`);
|
||||
}
|
||||
renderAppliedSummaries({ summaries: appliedSummaries, runtime });
|
||||
}
|
||||
|
||||
@@ -184,6 +184,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"cron.run",
|
||||
"sessions.patch",
|
||||
"sessions.pluginPatch",
|
||||
"sessions.cleanup",
|
||||
"sessions.reset",
|
||||
"sessions.delete",
|
||||
"sessions.compact",
|
||||
|
||||
@@ -256,6 +256,8 @@ import {
|
||||
SessionsAbortParamsSchema,
|
||||
type SessionsCompactParams,
|
||||
SessionsCompactParamsSchema,
|
||||
type SessionsCleanupParams,
|
||||
SessionsCleanupParamsSchema,
|
||||
type SessionsCompactionBranchParams,
|
||||
SessionsCompactionBranchParamsSchema,
|
||||
type SessionsCompactionGetParams,
|
||||
@@ -444,6 +446,9 @@ export const validateSecretsResolveResult = ajv.compile<SecretsResolveResult>(
|
||||
SecretsResolveResultSchema,
|
||||
);
|
||||
export const validateSessionsListParams = ajv.compile<SessionsListParams>(SessionsListParamsSchema);
|
||||
export const validateSessionsCleanupParams = ajv.compile<SessionsCleanupParams>(
|
||||
SessionsCleanupParamsSchema,
|
||||
);
|
||||
export const validateSessionsPreviewParams = ajv.compile<SessionsPreviewParams>(
|
||||
SessionsPreviewParamsSchema,
|
||||
);
|
||||
@@ -693,6 +698,7 @@ export {
|
||||
NodePendingEnqueueParamsSchema,
|
||||
NodePendingEnqueueResultSchema,
|
||||
SessionsListParamsSchema,
|
||||
SessionsCleanupParamsSchema,
|
||||
SessionsPreviewParamsSchema,
|
||||
SessionsResolveParamsSchema,
|
||||
SessionsCompactionListParamsSchema,
|
||||
@@ -918,6 +924,7 @@ export type {
|
||||
NodePendingEnqueueParams,
|
||||
NodePendingEnqueueResult,
|
||||
SessionsListParams,
|
||||
SessionsCleanupParams,
|
||||
SessionsPreviewParams,
|
||||
SessionsResolveParams,
|
||||
SessionsPatchParams,
|
||||
|
||||
@@ -201,6 +201,7 @@ import {
|
||||
SessionsCompactionRestoreParamsSchema,
|
||||
SessionsCompactionRestoreResultSchema,
|
||||
SessionCompactionCheckpointSchema,
|
||||
SessionsCleanupParamsSchema,
|
||||
SessionsCreateParamsSchema,
|
||||
SessionsDeleteParamsSchema,
|
||||
SessionsListParamsSchema,
|
||||
@@ -275,6 +276,7 @@ export const ProtocolSchemas = {
|
||||
SecretsResolveAssignment: SecretsResolveAssignmentSchema,
|
||||
SecretsResolveResult: SecretsResolveResultSchema,
|
||||
SessionsListParams: SessionsListParamsSchema,
|
||||
SessionsCleanupParams: SessionsCleanupParamsSchema,
|
||||
SessionsPreviewParams: SessionsPreviewParamsSchema,
|
||||
SessionsResolveParams: SessionsResolveParamsSchema,
|
||||
SessionCompactionCheckpoint: SessionCompactionCheckpointSchema,
|
||||
|
||||
@@ -60,6 +60,17 @@ export const SessionsListParamsSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const SessionsCleanupParamsSchema = Type.Object(
|
||||
{
|
||||
agent: Type.Optional(NonEmptyString),
|
||||
allAgents: Type.Optional(Type.Boolean()),
|
||||
enforce: Type.Optional(Type.Boolean()),
|
||||
activeKey: Type.Optional(NonEmptyString),
|
||||
fixMissing: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const SessionsPreviewParamsSchema = Type.Object(
|
||||
{
|
||||
keys: Type.Array(NonEmptyString, { minItems: 1 }),
|
||||
|
||||
@@ -44,6 +44,7 @@ export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">;
|
||||
export type PushTestParams = SchemaType<"PushTestParams">;
|
||||
export type PushTestResult = SchemaType<"PushTestResult">;
|
||||
export type SessionsListParams = SchemaType<"SessionsListParams">;
|
||||
export type SessionsCleanupParams = SchemaType<"SessionsCleanupParams">;
|
||||
export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">;
|
||||
export type SessionsResolveParams = SchemaType<"SessionsResolveParams">;
|
||||
export type SessionCompactionCheckpoint = SchemaType<"SessionCompactionCheckpoint">;
|
||||
|
||||
@@ -105,6 +105,7 @@ const BASE_METHODS = [
|
||||
"sessions.abort",
|
||||
"sessions.patch",
|
||||
"sessions.pluginPatch",
|
||||
"sessions.cleanup",
|
||||
"sessions.reset",
|
||||
"sessions.delete",
|
||||
"sessions.compact",
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
import { compactEmbeddedPiSession } from "../../agents/pi-embedded.js";
|
||||
import { clearSessionQueues } from "../../auto-reply/reply/queue/cleanup.js";
|
||||
import { normalizeReasoningLevel, normalizeThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import { runSessionsCleanup } from "../../commands/sessions-cleanup.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveMainSessionKey,
|
||||
@@ -47,6 +48,7 @@ import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
validateSessionsAbortParams,
|
||||
validateSessionsCleanupParams,
|
||||
validateSessionsCompactParams,
|
||||
validateSessionsCompactionBranchParams,
|
||||
validateSessionsCompactionGetParams,
|
||||
@@ -663,6 +665,46 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
});
|
||||
respond(true, result, undefined);
|
||||
},
|
||||
"sessions.cleanup": async ({ params, respond, context }) => {
|
||||
if (!assertValidParams(params, validateSessionsCleanupParams, "sessions.cleanup", respond)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const { mode, appliedSummaries } = await runSessionsCleanup({
|
||||
cfg: context.getRuntimeConfig(),
|
||||
opts: {
|
||||
agent: params.agent,
|
||||
allAgents: params.allAgents,
|
||||
enforce: params.enforce,
|
||||
activeKey: params.activeKey,
|
||||
fixMissing: params.fixMissing,
|
||||
},
|
||||
});
|
||||
const result =
|
||||
appliedSummaries.length === 1
|
||||
? (appliedSummaries[0] ?? {})
|
||||
: {
|
||||
allAgents: true,
|
||||
mode,
|
||||
dryRun: false,
|
||||
stores: appliedSummaries,
|
||||
};
|
||||
respond(true, result, undefined);
|
||||
for (const summary of appliedSummaries) {
|
||||
emitSessionsChanged(context, {
|
||||
reason: "cleanup",
|
||||
sessionKey: undefined,
|
||||
});
|
||||
if (summary.wouldMutate) {
|
||||
context.logGateway.debug(
|
||||
`sessions.cleanup applied ${summary.storePath}: ${summary.beforeCount} -> ${summary.afterCount}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, formatErrorMessage(error)));
|
||||
}
|
||||
},
|
||||
"sessions.subscribe": ({ client, context, respond }) => {
|
||||
const connId = client?.connId?.trim();
|
||||
if (connId) {
|
||||
|
||||
@@ -67,6 +67,7 @@ test("lists and patches session store via sessions.* RPC", async () => {
|
||||
expect.arrayContaining([
|
||||
"sessions.list",
|
||||
"sessions.preview",
|
||||
"sessions.cleanup",
|
||||
"sessions.patch",
|
||||
"sessions.reset",
|
||||
"sessions.delete",
|
||||
@@ -78,6 +79,7 @@ test("lists and patches session store via sessions.* RPC", async () => {
|
||||
const directContext = {
|
||||
broadcastToConnIds: vi.fn(),
|
||||
getSessionEventSubscriberConnIds: () => new Set<string>(),
|
||||
logGateway: { debug: vi.fn() },
|
||||
loadGatewayModelCatalog: async () => piSdkMock.models,
|
||||
getRuntimeConfig: getRuntimeConfig,
|
||||
} as never;
|
||||
@@ -305,6 +307,23 @@ test("lists and patches session store via sessions.* RPC", async () => {
|
||||
});
|
||||
expect(spawnedPatchedInvalidKey.ok).toBe(false);
|
||||
|
||||
const cleaned = await directSessionReq<{
|
||||
applied: true;
|
||||
missing: number;
|
||||
appliedCount: number;
|
||||
}>("sessions.cleanup", {
|
||||
enforce: true,
|
||||
fixMissing: true,
|
||||
});
|
||||
expect(cleaned.ok).toBe(true);
|
||||
expect(cleaned.payload?.missing).toBeGreaterThanOrEqual(1);
|
||||
const listAfterCleanup = await directSessionReq<{
|
||||
sessions: Array<{ key: string }>;
|
||||
}>("sessions.list", {});
|
||||
expect(listAfterCleanup.payload?.sessions.some((s) => s.key === "agent:main:subagent:one")).toBe(
|
||||
false,
|
||||
);
|
||||
|
||||
piSdkMock.enabled = true;
|
||||
piSdkMock.models = [{ id: "gpt-test-a", name: "A", provider: "openai" }];
|
||||
const modelPatched = await directSessionReq<{
|
||||
|
||||
Reference in New Issue
Block a user