Add session lifecycle gateway methods

This commit is contained in:
Tyler Yust
2026-03-12 10:02:25 -07:00
parent 2beb2afdd7
commit c8ae47a9fe
10 changed files with 428 additions and 1 deletions

View File

@@ -8,13 +8,22 @@ import { listGatewayMethods } from "./server-methods-list.js";
import { coreGatewayHandlers } from "./server-methods.js";
describe("method scope resolution", () => {
it("classifies sessions.resolve + config.schema.lookup as read and poll as write", () => {
it("classifies session dashboard lifecycle methods with least privilege scopes", () => {
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.resolve")).toEqual([
"operator.read",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("config.schema.lookup")).toEqual([
"operator.read",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.create")).toEqual([
"operator.write",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.send")).toEqual([
"operator.write",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.abort")).toEqual([
"operator.write",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
});

View File

@@ -104,6 +104,9 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
"node.invoke",
"chat.send",
"chat.abort",
"sessions.create",
"sessions.send",
"sessions.abort",
"browser.request",
"push.test",
"node.pending.enqueue",

View File

@@ -186,8 +186,12 @@ import {
type SecretsResolveResult,
SecretsResolveParamsSchema,
SecretsResolveResultSchema,
type SessionsAbortParams,
SessionsAbortParamsSchema,
type SessionsCompactParams,
SessionsCompactParamsSchema,
type SessionsCreateParams,
SessionsCreateParamsSchema,
type SessionsDeleteParams,
SessionsDeleteParamsSchema,
type SessionsListParams,
@@ -200,6 +204,8 @@ import {
SessionsResetParamsSchema,
type SessionsResolveParams,
SessionsResolveParamsSchema,
type SessionsSendParams,
SessionsSendParamsSchema,
type SessionsUsageParams,
SessionsUsageParamsSchema,
type ShutdownEvent,
@@ -324,6 +330,12 @@ export const validateSessionsPreviewParams = ajv.compile<SessionsPreviewParams>(
export const validateSessionsResolveParams = ajv.compile<SessionsResolveParams>(
SessionsResolveParamsSchema,
);
export const validateSessionsCreateParams = ajv.compile<SessionsCreateParams>(
SessionsCreateParamsSchema,
);
export const validateSessionsSendParams = ajv.compile<SessionsSendParams>(SessionsSendParamsSchema);
export const validateSessionsAbortParams =
ajv.compile<SessionsAbortParams>(SessionsAbortParamsSchema);
export const validateSessionsPatchParams =
ajv.compile<SessionsPatchParams>(SessionsPatchParamsSchema);
export const validateSessionsResetParams =
@@ -492,6 +504,10 @@ export {
NodePendingEnqueueResultSchema,
SessionsListParamsSchema,
SessionsPreviewParamsSchema,
SessionsResolveParamsSchema,
SessionsCreateParamsSchema,
SessionsSendParamsSchema,
SessionsAbortParamsSchema,
SessionsPatchParamsSchema,
SessionsResetParamsSchema,
SessionsDeleteParamsSchema,

View File

@@ -138,13 +138,16 @@ import {
SecretsResolveResultSchema,
} from "./secrets.js";
import {
SessionsAbortParamsSchema,
SessionsCompactParamsSchema,
SessionsCreateParamsSchema,
SessionsDeleteParamsSchema,
SessionsListParamsSchema,
SessionsPatchParamsSchema,
SessionsPreviewParamsSchema,
SessionsResetParamsSchema,
SessionsResolveParamsSchema,
SessionsSendParamsSchema,
SessionsUsageParamsSchema,
} from "./sessions.js";
import { PresenceEntrySchema, SnapshotSchema, StateVersionSchema } from "./snapshot.js";
@@ -204,6 +207,9 @@ export const ProtocolSchemas = {
SessionsListParams: SessionsListParamsSchema,
SessionsPreviewParams: SessionsPreviewParamsSchema,
SessionsResolveParams: SessionsResolveParamsSchema,
SessionsCreateParams: SessionsCreateParamsSchema,
SessionsSendParams: SessionsSendParamsSchema,
SessionsAbortParams: SessionsAbortParamsSchema,
SessionsPatchParams: SessionsPatchParamsSchema,
SessionsResetParams: SessionsResetParamsSchema,
SessionsDeleteParams: SessionsDeleteParamsSchema,

View File

@@ -47,6 +47,34 @@ export const SessionsResolveParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const SessionsCreateParamsSchema = Type.Object(
{
agentId: Type.Optional(NonEmptyString),
label: Type.Optional(SessionLabelString),
},
{ additionalProperties: false },
);
export const SessionsSendParamsSchema = Type.Object(
{
key: NonEmptyString,
message: Type.String(),
thinking: Type.Optional(Type.String()),
attachments: Type.Optional(Type.Array(Type.Unknown())),
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
idempotencyKey: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const SessionsAbortParamsSchema = Type.Object(
{
key: NonEmptyString,
runId: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const SessionsPatchParamsSchema = Type.Object(
{
key: NonEmptyString,

View File

@@ -41,6 +41,9 @@ export type PushTestResult = SchemaType<"PushTestResult">;
export type SessionsListParams = SchemaType<"SessionsListParams">;
export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">;
export type SessionsResolveParams = SchemaType<"SessionsResolveParams">;
export type SessionsCreateParams = SchemaType<"SessionsCreateParams">;
export type SessionsSendParams = SchemaType<"SessionsSendParams">;
export type SessionsAbortParams = SchemaType<"SessionsAbortParams">;
export type SessionsPatchParams = SchemaType<"SessionsPatchParams">;
export type SessionsResetParams = SchemaType<"SessionsResetParams">;
export type SessionsDeleteParams = SchemaType<"SessionsDeleteParams">;

View File

@@ -57,6 +57,9 @@ const BASE_METHODS = [
"sessions.subscribe",
"sessions.unsubscribe",
"sessions.preview",
"sessions.create",
"sessions.send",
"sessions.abort",
"sessions.patch",
"sessions.reset",
"sessions.delete",

View File

@@ -1,9 +1,14 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { loadConfig } from "../../config/config.js";
import {
loadSessionStore,
resolveMainSessionKey,
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
updateSessionStore,
} from "../../config/sessions.js";
@@ -12,13 +17,16 @@ import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js";
import {
ErrorCodes,
errorShape,
validateSessionsAbortParams,
validateSessionsCompactParams,
validateSessionsCreateParams,
validateSessionsDeleteParams,
validateSessionsListParams,
validateSessionsPatchParams,
validateSessionsPreviewParams,
validateSessionsResetParams,
validateSessionsResolveParams,
validateSessionsSendParams,
} from "../protocol/index.js";
import {
archiveSessionTranscriptsForSession,
@@ -43,6 +51,7 @@ import {
} from "../session-utils.js";
import { applySessionsPatchToStore } from "../sessions-patch.js";
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
import { chatHandlers } from "./chat.js";
import type {
GatewayClient,
GatewayRequestContext,
@@ -141,6 +150,72 @@ function migrateAndPruneSessionStoreKey(params: {
return { target, primaryKey, entry: params.store[primaryKey] };
}
function buildDashboardSessionKey(agentId: string): string {
return `agent:${agentId}:dashboard:${randomUUID()}`;
}
function ensureSessionTranscriptFile(params: {
sessionId: string;
storePath: string;
sessionFile?: string;
agentId: string;
}): { ok: true; transcriptPath: string } | { ok: false; error: string } {
try {
const transcriptPath = resolveSessionFilePath(
params.sessionId,
params.sessionFile ? { sessionFile: params.sessionFile } : undefined,
resolveSessionFilePathOptions({
storePath: params.storePath,
agentId: params.agentId,
}),
);
if (!fs.existsSync(transcriptPath)) {
fs.mkdirSync(path.dirname(transcriptPath), { recursive: true });
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: params.sessionId,
timestamp: new Date().toISOString(),
cwd: process.cwd(),
};
fs.writeFileSync(transcriptPath, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
mode: 0o600,
});
}
return { ok: true, transcriptPath };
} catch (err) {
return {
ok: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
function resolveAbortSessionKey(params: {
context: Pick<GatewayRequestContext, "chatAbortControllers">;
requestedKey: string;
canonicalKey: string;
runId?: string;
}): string {
const activeRunKey =
typeof params.runId === "string"
? params.context.chatAbortControllers.get(params.runId)?.sessionKey
: undefined;
if (activeRunKey) {
return activeRunKey;
}
for (const active of params.context.chatAbortControllers.values()) {
if (active.sessionKey === params.canonicalKey) {
return params.canonicalKey;
}
if (active.sessionKey === params.requestedKey) {
return params.requestedKey;
}
}
return params.requestedKey;
}
export const sessionsHandlers: GatewayRequestHandlers = {
"sessions.list": ({ params, respond }) => {
if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) {
@@ -247,6 +322,154 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
respond(true, { ok: true, key: resolved.key }, undefined);
},
"sessions.create": async ({ params, respond, context }) => {
if (!assertValidParams(params, validateSessionsCreateParams, "sessions.create", respond)) {
return;
}
const p = params;
const cfg = loadConfig();
const agentId = normalizeAgentId(
typeof p.agentId === "string" && p.agentId.trim() ? p.agentId : resolveDefaultAgentId(cfg),
);
const key = buildDashboardSessionKey(agentId);
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const created = await updateSessionStore(target.storePath, async (store) => {
return await applySessionsPatchToStore({
cfg,
store,
storeKey: target.canonicalKey,
patch: {
key: target.canonicalKey,
label: typeof p.label === "string" ? p.label.trim() : undefined,
},
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
});
});
if (!created.ok) {
respond(false, undefined, created.error);
return;
}
const ensured = ensureSessionTranscriptFile({
sessionId: created.entry.sessionId,
storePath: target.storePath,
sessionFile: created.entry.sessionFile,
agentId,
});
if (!ensured.ok) {
await updateSessionStore(target.storePath, (store) => {
delete store[target.canonicalKey];
});
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, `failed to create session transcript: ${ensured.error}`),
);
return;
}
respond(
true,
{
ok: true,
key: target.canonicalKey,
sessionId: created.entry.sessionId,
entry: created.entry,
},
undefined,
);
emitSessionsChanged(context, {
sessionKey: target.canonicalKey,
reason: "create",
});
},
"sessions.send": async ({ req, params, respond, context, client, isWebchatConnect }) => {
if (!assertValidParams(params, validateSessionsSendParams, "sessions.send", respond)) {
return;
}
const p = params;
const key = requireSessionKey(p.key, respond);
if (!key) {
return;
}
const { entry, canonicalKey } = loadSessionEntry(key);
if (!entry?.sessionId) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `session not found: ${key}`),
);
return;
}
let sendAcked = false;
await chatHandlers["chat.send"]({
req,
params: {
sessionKey: canonicalKey,
message: p.message,
thinking: p.thinking,
attachments: p.attachments,
timeoutMs: p.timeoutMs,
idempotencyKey:
typeof p.idempotencyKey === "string" && p.idempotencyKey.trim()
? p.idempotencyKey.trim()
: randomUUID(),
},
respond: (ok, payload, error, meta) => {
sendAcked = ok;
respond(ok, payload, error, meta);
},
context,
client,
isWebchatConnect,
});
if (sendAcked) {
emitSessionsChanged(context, {
sessionKey: canonicalKey,
reason: "send",
});
}
},
"sessions.abort": async ({ req, params, respond, context, client, isWebchatConnect }) => {
if (!assertValidParams(params, validateSessionsAbortParams, "sessions.abort", respond)) {
return;
}
const p = params;
const key = requireSessionKey(p.key, respond);
if (!key) {
return;
}
const { canonicalKey } = loadSessionEntry(key);
const abortSessionKey = resolveAbortSessionKey({
context,
requestedKey: key,
canonicalKey,
runId: typeof p.runId === "string" ? p.runId : undefined,
});
let aborted = false;
await chatHandlers["chat.abort"]({
req,
params: {
sessionKey: abortSessionKey,
runId: typeof p.runId === "string" ? p.runId : undefined,
},
respond: (ok, payload, error, meta) => {
aborted =
ok &&
Boolean(
payload && typeof payload === "object" && (payload as { aborted?: boolean }).aborted,
);
respond(ok, payload, error, meta);
},
context,
client,
isWebchatConnect,
});
if (aborted) {
emitSessionsChanged(context, {
sessionKey: canonicalKey,
reason: "abort",
});
}
},
"sessions.patch": async ({ params, respond, context, client, isWebchatConnect }) => {
if (!assertValidParams(params, validateSessionsPatchParams, "sessions.patch", respond)) {
return;

View File

@@ -171,6 +171,101 @@ describe("gateway server chat", () => {
};
};
test("sessions.send forwards dashboard messages into existing sessions", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-send-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
try {
await writeSessionStore({
entries: {
"agent:main:dashboard:test-send": {
sessionId: "sess-dashboard-send",
updatedAt: Date.now(),
},
},
});
const spy = vi.mocked(getReplyFromConfig);
const callsBefore = spy.mock.calls.length;
const res = await rpcReq(ws, "sessions.send", {
key: "agent:main:dashboard:test-send",
message: "hello from dashboard",
idempotencyKey: "idem-sessions-send-1",
});
expect(res.ok).toBe(true);
expect(res.payload?.runId).toBe("idem-sessions-send-1");
await waitFor(() => spy.mock.calls.length > callsBefore, 1_000);
const ctx = spy.mock.calls.at(-1)?.[0] as { Body?: string; SessionKey?: string } | undefined;
expect(ctx?.Body).toContain("hello from dashboard");
expect(ctx?.SessionKey).toBe("agent:main:dashboard:test-send");
} finally {
testState.sessionStorePath = undefined;
await fs.rm(dir, { recursive: true, force: true });
}
});
test("sessions.abort stops active dashboard runs", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-abort-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
try {
await writeSessionStore({
entries: {
"agent:main:dashboard:test-abort": {
sessionId: "sess-dashboard-abort",
updatedAt: Date.now(),
},
},
});
let aborted = false;
const spy = vi.mocked(getReplyFromConfig);
spy.mockImplementationOnce(async (_ctx, opts) => {
const signal = opts?.abortSignal;
await new Promise<void>((resolve) => {
if (!signal) {
resolve();
return;
}
if (signal.aborted) {
aborted = true;
resolve();
return;
}
signal.addEventListener(
"abort",
() => {
aborted = true;
resolve();
},
{ once: true },
);
});
return undefined;
});
const sendRes = await rpcReq(ws, "sessions.send", {
key: "agent:main:dashboard:test-abort",
message: "hello",
idempotencyKey: "idem-sessions-abort-1",
timeoutMs: 30_000,
});
expect(sendRes.ok).toBe(true);
await waitFor(() => spy.mock.calls.length > 0, 1_000);
const abortRes = await rpcReq(ws, "sessions.abort", {
key: "agent:main:dashboard:test-abort",
runId: "idem-sessions-abort-1",
});
expect(abortRes.ok).toBe(true);
expect(abortRes.payload?.aborted).toBe(true);
await waitFor(() => aborted, 1_000);
} finally {
testState.sessionStorePath = undefined;
await fs.rm(dir, { recursive: true, force: true });
}
});
test("sanitizes inbound chat.send message text and rejects null bytes", async () => {
const nullByteRes = await rpcReq(ws, "chat.send", {
sessionKey: "main",

View File

@@ -233,6 +233,47 @@ describe("gateway server sessions", () => {
browserSessionTabMocks.closeTrackedBrowserTabsForSessions.mockResolvedValue(0);
});
test("sessions.create creates a dashboard session entry and transcript", async () => {
const { dir, storePath } = await createSessionStoreDir();
const { ws } = await openClient();
const created = await rpcReq<{
key?: string;
sessionId?: string;
entry?: { label?: string };
}>(ws, "sessions.create", {
agentId: "ops",
label: "Dashboard Chat",
});
expect(created.ok).toBe(true);
expect(created.payload?.key).toMatch(/^agent:ops:dashboard:/);
expect(created.payload?.entry?.label).toBe("Dashboard Chat");
expect(created.payload?.sessionId).toMatch(
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/,
);
const rawStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{ sessionId?: string; label?: string }
>;
const key = created.payload?.key as string;
expect(rawStore[key]).toMatchObject({
sessionId: created.payload?.sessionId,
label: "Dashboard Chat",
});
const transcriptPath = path.join(dir, `${created.payload?.sessionId}.jsonl`);
const transcript = await fs.readFile(transcriptPath, "utf-8");
const [headerLine] = transcript.trim().split(/\r?\n/, 1);
expect(JSON.parse(headerLine) as { type?: string; id?: string }).toMatchObject({
type: "session",
id: created.payload?.sessionId,
});
ws.close();
});
test("lists and patches session store via sessions.* RPC", async () => {
const { dir, storePath } = await createSessionStoreDir();
const now = Date.now();