From c8ae47a9fe356445f50612f0bc036ca0db1fee9e Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Thu, 12 Mar 2026 10:02:25 -0700 Subject: [PATCH] Add session lifecycle gateway methods --- src/gateway/method-scopes.test.ts | 11 +- src/gateway/method-scopes.ts | 3 + src/gateway/protocol/index.ts | 16 ++ .../protocol/schema/protocol-schemas.ts | 6 + src/gateway/protocol/schema/sessions.ts | 28 +++ src/gateway/protocol/schema/types.ts | 3 + src/gateway/server-methods-list.ts | 3 + src/gateway/server-methods/sessions.ts | 223 ++++++++++++++++++ .../server.chat.gateway-server-chat.test.ts | 95 ++++++++ ...sessions.gateway-server-sessions-a.test.ts | 41 ++++ 10 files changed, 428 insertions(+), 1 deletion(-) diff --git a/src/gateway/method-scopes.test.ts b/src/gateway/method-scopes.test.ts index 18ff74509ee..ba47b3469b4 100644 --- a/src/gateway/method-scopes.test.ts +++ b/src/gateway/method-scopes.test.ts @@ -8,13 +8,22 @@ import { listGatewayMethods } from "./server-methods-list.js"; import { coreGatewayHandlers } from "./server-methods.js"; describe("method scope resolution", () => { - it("classifies sessions.resolve + config.schema.lookup as read and poll as write", () => { + it("classifies session dashboard lifecycle methods with least privilege scopes", () => { expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.resolve")).toEqual([ "operator.read", ]); expect(resolveLeastPrivilegeOperatorScopesForMethod("config.schema.lookup")).toEqual([ "operator.read", ]); + expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.create")).toEqual([ + "operator.write", + ]); + expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.send")).toEqual([ + "operator.write", + ]); + expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.abort")).toEqual([ + "operator.write", + ]); expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]); }); diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 6282c22365e..830ed9f2f16 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -104,6 +104,9 @@ const METHOD_SCOPE_GROUPS: Record = { "node.invoke", "chat.send", "chat.abort", + "sessions.create", + "sessions.send", + "sessions.abort", "browser.request", "push.test", "node.pending.enqueue", diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 9c469333363..ad483853b03 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -186,8 +186,12 @@ import { type SecretsResolveResult, SecretsResolveParamsSchema, SecretsResolveResultSchema, + type SessionsAbortParams, + SessionsAbortParamsSchema, type SessionsCompactParams, SessionsCompactParamsSchema, + type SessionsCreateParams, + SessionsCreateParamsSchema, type SessionsDeleteParams, SessionsDeleteParamsSchema, type SessionsListParams, @@ -200,6 +204,8 @@ import { SessionsResetParamsSchema, type SessionsResolveParams, SessionsResolveParamsSchema, + type SessionsSendParams, + SessionsSendParamsSchema, type SessionsUsageParams, SessionsUsageParamsSchema, type ShutdownEvent, @@ -324,6 +330,12 @@ export const validateSessionsPreviewParams = ajv.compile( export const validateSessionsResolveParams = ajv.compile( SessionsResolveParamsSchema, ); +export const validateSessionsCreateParams = ajv.compile( + SessionsCreateParamsSchema, +); +export const validateSessionsSendParams = ajv.compile(SessionsSendParamsSchema); +export const validateSessionsAbortParams = + ajv.compile(SessionsAbortParamsSchema); export const validateSessionsPatchParams = ajv.compile(SessionsPatchParamsSchema); export const validateSessionsResetParams = @@ -492,6 +504,10 @@ export { NodePendingEnqueueResultSchema, SessionsListParamsSchema, SessionsPreviewParamsSchema, + SessionsResolveParamsSchema, + SessionsCreateParamsSchema, + SessionsSendParamsSchema, + SessionsAbortParamsSchema, SessionsPatchParamsSchema, SessionsResetParamsSchema, SessionsDeleteParamsSchema, diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 574a74d8d41..d3d1103f848 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -138,13 +138,16 @@ import { SecretsResolveResultSchema, } from "./secrets.js"; import { + SessionsAbortParamsSchema, SessionsCompactParamsSchema, + SessionsCreateParamsSchema, SessionsDeleteParamsSchema, SessionsListParamsSchema, SessionsPatchParamsSchema, SessionsPreviewParamsSchema, SessionsResetParamsSchema, SessionsResolveParamsSchema, + SessionsSendParamsSchema, SessionsUsageParamsSchema, } from "./sessions.js"; import { PresenceEntrySchema, SnapshotSchema, StateVersionSchema } from "./snapshot.js"; @@ -204,6 +207,9 @@ export const ProtocolSchemas = { SessionsListParams: SessionsListParamsSchema, SessionsPreviewParams: SessionsPreviewParamsSchema, SessionsResolveParams: SessionsResolveParamsSchema, + SessionsCreateParams: SessionsCreateParamsSchema, + SessionsSendParams: SessionsSendParamsSchema, + SessionsAbortParams: SessionsAbortParamsSchema, SessionsPatchParams: SessionsPatchParamsSchema, SessionsResetParams: SessionsResetParamsSchema, SessionsDeleteParams: SessionsDeleteParamsSchema, diff --git a/src/gateway/protocol/schema/sessions.ts b/src/gateway/protocol/schema/sessions.ts index 743700b9a48..1851782b40a 100644 --- a/src/gateway/protocol/schema/sessions.ts +++ b/src/gateway/protocol/schema/sessions.ts @@ -47,6 +47,34 @@ export const SessionsResolveParamsSchema = Type.Object( { additionalProperties: false }, ); +export const SessionsCreateParamsSchema = Type.Object( + { + agentId: Type.Optional(NonEmptyString), + label: Type.Optional(SessionLabelString), + }, + { additionalProperties: false }, +); + +export const SessionsSendParamsSchema = Type.Object( + { + key: NonEmptyString, + message: Type.String(), + thinking: Type.Optional(Type.String()), + attachments: Type.Optional(Type.Array(Type.Unknown())), + timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })), + idempotencyKey: Type.Optional(NonEmptyString), + }, + { additionalProperties: false }, +); + +export const SessionsAbortParamsSchema = Type.Object( + { + key: NonEmptyString, + runId: Type.Optional(NonEmptyString), + }, + { additionalProperties: false }, +); + export const SessionsPatchParamsSchema = Type.Object( { key: NonEmptyString, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index 56656aff1a3..f13aafe6d69 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -41,6 +41,9 @@ export type PushTestResult = SchemaType<"PushTestResult">; export type SessionsListParams = SchemaType<"SessionsListParams">; export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">; export type SessionsResolveParams = SchemaType<"SessionsResolveParams">; +export type SessionsCreateParams = SchemaType<"SessionsCreateParams">; +export type SessionsSendParams = SchemaType<"SessionsSendParams">; +export type SessionsAbortParams = SchemaType<"SessionsAbortParams">; export type SessionsPatchParams = SchemaType<"SessionsPatchParams">; export type SessionsResetParams = SchemaType<"SessionsResetParams">; export type SessionsDeleteParams = SchemaType<"SessionsDeleteParams">; diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index af31411f334..e70243d9e90 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -57,6 +57,9 @@ const BASE_METHODS = [ "sessions.subscribe", "sessions.unsubscribe", "sessions.preview", + "sessions.create", + "sessions.send", + "sessions.abort", "sessions.patch", "sessions.reset", "sessions.delete", diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 0f969b0de95..826601e7475 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -1,9 +1,14 @@ +import { randomUUID } from "node:crypto"; import fs from "node:fs"; +import path from "node:path"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; import { resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { loadConfig } from "../../config/config.js"; import { loadSessionStore, resolveMainSessionKey, + resolveSessionFilePath, + resolveSessionFilePathOptions, type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; @@ -12,13 +17,16 @@ import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js"; import { ErrorCodes, errorShape, + validateSessionsAbortParams, validateSessionsCompactParams, + validateSessionsCreateParams, validateSessionsDeleteParams, validateSessionsListParams, validateSessionsPatchParams, validateSessionsPreviewParams, validateSessionsResetParams, validateSessionsResolveParams, + validateSessionsSendParams, } from "../protocol/index.js"; import { archiveSessionTranscriptsForSession, @@ -43,6 +51,7 @@ import { } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; +import { chatHandlers } from "./chat.js"; import type { GatewayClient, GatewayRequestContext, @@ -141,6 +150,72 @@ function migrateAndPruneSessionStoreKey(params: { return { target, primaryKey, entry: params.store[primaryKey] }; } +function buildDashboardSessionKey(agentId: string): string { + return `agent:${agentId}:dashboard:${randomUUID()}`; +} + +function ensureSessionTranscriptFile(params: { + sessionId: string; + storePath: string; + sessionFile?: string; + agentId: string; +}): { ok: true; transcriptPath: string } | { ok: false; error: string } { + try { + const transcriptPath = resolveSessionFilePath( + params.sessionId, + params.sessionFile ? { sessionFile: params.sessionFile } : undefined, + resolveSessionFilePathOptions({ + storePath: params.storePath, + agentId: params.agentId, + }), + ); + if (!fs.existsSync(transcriptPath)) { + fs.mkdirSync(path.dirname(transcriptPath), { recursive: true }); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: params.sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + fs.writeFileSync(transcriptPath, `${JSON.stringify(header)}\n`, { + encoding: "utf-8", + mode: 0o600, + }); + } + return { ok: true, transcriptPath }; + } catch (err) { + return { + ok: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +function resolveAbortSessionKey(params: { + context: Pick; + requestedKey: string; + canonicalKey: string; + runId?: string; +}): string { + const activeRunKey = + typeof params.runId === "string" + ? params.context.chatAbortControllers.get(params.runId)?.sessionKey + : undefined; + if (activeRunKey) { + return activeRunKey; + } + for (const active of params.context.chatAbortControllers.values()) { + if (active.sessionKey === params.canonicalKey) { + return params.canonicalKey; + } + if (active.sessionKey === params.requestedKey) { + return params.requestedKey; + } + } + return params.requestedKey; +} + export const sessionsHandlers: GatewayRequestHandlers = { "sessions.list": ({ params, respond }) => { if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) { @@ -247,6 +322,154 @@ export const sessionsHandlers: GatewayRequestHandlers = { } respond(true, { ok: true, key: resolved.key }, undefined); }, + "sessions.create": async ({ params, respond, context }) => { + if (!assertValidParams(params, validateSessionsCreateParams, "sessions.create", respond)) { + return; + } + const p = params; + const cfg = loadConfig(); + const agentId = normalizeAgentId( + typeof p.agentId === "string" && p.agentId.trim() ? p.agentId : resolveDefaultAgentId(cfg), + ); + const key = buildDashboardSessionKey(agentId); + const target = resolveGatewaySessionStoreTarget({ cfg, key }); + const created = await updateSessionStore(target.storePath, async (store) => { + return await applySessionsPatchToStore({ + cfg, + store, + storeKey: target.canonicalKey, + patch: { + key: target.canonicalKey, + label: typeof p.label === "string" ? p.label.trim() : undefined, + }, + loadGatewayModelCatalog: context.loadGatewayModelCatalog, + }); + }); + if (!created.ok) { + respond(false, undefined, created.error); + return; + } + const ensured = ensureSessionTranscriptFile({ + sessionId: created.entry.sessionId, + storePath: target.storePath, + sessionFile: created.entry.sessionFile, + agentId, + }); + if (!ensured.ok) { + await updateSessionStore(target.storePath, (store) => { + delete store[target.canonicalKey]; + }); + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, `failed to create session transcript: ${ensured.error}`), + ); + return; + } + respond( + true, + { + ok: true, + key: target.canonicalKey, + sessionId: created.entry.sessionId, + entry: created.entry, + }, + undefined, + ); + emitSessionsChanged(context, { + sessionKey: target.canonicalKey, + reason: "create", + }); + }, + "sessions.send": async ({ req, params, respond, context, client, isWebchatConnect }) => { + if (!assertValidParams(params, validateSessionsSendParams, "sessions.send", respond)) { + return; + } + const p = params; + const key = requireSessionKey(p.key, respond); + if (!key) { + return; + } + const { entry, canonicalKey } = loadSessionEntry(key); + if (!entry?.sessionId) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, `session not found: ${key}`), + ); + return; + } + let sendAcked = false; + await chatHandlers["chat.send"]({ + req, + params: { + sessionKey: canonicalKey, + message: p.message, + thinking: p.thinking, + attachments: p.attachments, + timeoutMs: p.timeoutMs, + idempotencyKey: + typeof p.idempotencyKey === "string" && p.idempotencyKey.trim() + ? p.idempotencyKey.trim() + : randomUUID(), + }, + respond: (ok, payload, error, meta) => { + sendAcked = ok; + respond(ok, payload, error, meta); + }, + context, + client, + isWebchatConnect, + }); + if (sendAcked) { + emitSessionsChanged(context, { + sessionKey: canonicalKey, + reason: "send", + }); + } + }, + "sessions.abort": async ({ req, params, respond, context, client, isWebchatConnect }) => { + if (!assertValidParams(params, validateSessionsAbortParams, "sessions.abort", respond)) { + return; + } + const p = params; + const key = requireSessionKey(p.key, respond); + if (!key) { + return; + } + const { canonicalKey } = loadSessionEntry(key); + const abortSessionKey = resolveAbortSessionKey({ + context, + requestedKey: key, + canonicalKey, + runId: typeof p.runId === "string" ? p.runId : undefined, + }); + let aborted = false; + await chatHandlers["chat.abort"]({ + req, + params: { + sessionKey: abortSessionKey, + runId: typeof p.runId === "string" ? p.runId : undefined, + }, + respond: (ok, payload, error, meta) => { + aborted = + ok && + Boolean( + payload && typeof payload === "object" && (payload as { aborted?: boolean }).aborted, + ); + respond(ok, payload, error, meta); + }, + context, + client, + isWebchatConnect, + }); + if (aborted) { + emitSessionsChanged(context, { + sessionKey: canonicalKey, + reason: "abort", + }); + } + }, "sessions.patch": async ({ params, respond, context, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsPatchParams, "sessions.patch", respond)) { return; diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index 76c51cd6d78..4e1c69872b2 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -171,6 +171,101 @@ describe("gateway server chat", () => { }; }; + test("sessions.send forwards dashboard messages into existing sessions", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-send-")); + testState.sessionStorePath = path.join(dir, "sessions.json"); + try { + await writeSessionStore({ + entries: { + "agent:main:dashboard:test-send": { + sessionId: "sess-dashboard-send", + updatedAt: Date.now(), + }, + }, + }); + + const spy = vi.mocked(getReplyFromConfig); + const callsBefore = spy.mock.calls.length; + const res = await rpcReq(ws, "sessions.send", { + key: "agent:main:dashboard:test-send", + message: "hello from dashboard", + idempotencyKey: "idem-sessions-send-1", + }); + expect(res.ok).toBe(true); + expect(res.payload?.runId).toBe("idem-sessions-send-1"); + + await waitFor(() => spy.mock.calls.length > callsBefore, 1_000); + const ctx = spy.mock.calls.at(-1)?.[0] as { Body?: string; SessionKey?: string } | undefined; + expect(ctx?.Body).toContain("hello from dashboard"); + expect(ctx?.SessionKey).toBe("agent:main:dashboard:test-send"); + } finally { + testState.sessionStorePath = undefined; + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + test("sessions.abort stops active dashboard runs", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-abort-")); + testState.sessionStorePath = path.join(dir, "sessions.json"); + try { + await writeSessionStore({ + entries: { + "agent:main:dashboard:test-abort": { + sessionId: "sess-dashboard-abort", + updatedAt: Date.now(), + }, + }, + }); + + let aborted = false; + const spy = vi.mocked(getReplyFromConfig); + spy.mockImplementationOnce(async (_ctx, opts) => { + const signal = opts?.abortSignal; + await new Promise((resolve) => { + if (!signal) { + resolve(); + return; + } + if (signal.aborted) { + aborted = true; + resolve(); + return; + } + signal.addEventListener( + "abort", + () => { + aborted = true; + resolve(); + }, + { once: true }, + ); + }); + return undefined; + }); + + const sendRes = await rpcReq(ws, "sessions.send", { + key: "agent:main:dashboard:test-abort", + message: "hello", + idempotencyKey: "idem-sessions-abort-1", + timeoutMs: 30_000, + }); + expect(sendRes.ok).toBe(true); + + await waitFor(() => spy.mock.calls.length > 0, 1_000); + + const abortRes = await rpcReq(ws, "sessions.abort", { + key: "agent:main:dashboard:test-abort", + runId: "idem-sessions-abort-1", + }); + expect(abortRes.ok).toBe(true); + expect(abortRes.payload?.aborted).toBe(true); + await waitFor(() => aborted, 1_000); + } finally { + testState.sessionStorePath = undefined; + await fs.rm(dir, { recursive: true, force: true }); + } + }); + test("sanitizes inbound chat.send message text and rejects null bytes", async () => { const nullByteRes = await rpcReq(ws, "chat.send", { sessionKey: "main", diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 1decc4b9178..70d6821e44b 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -233,6 +233,47 @@ describe("gateway server sessions", () => { browserSessionTabMocks.closeTrackedBrowserTabsForSessions.mockResolvedValue(0); }); + test("sessions.create creates a dashboard session entry and transcript", async () => { + const { dir, storePath } = await createSessionStoreDir(); + const { ws } = await openClient(); + + const created = await rpcReq<{ + key?: string; + sessionId?: string; + entry?: { label?: string }; + }>(ws, "sessions.create", { + agentId: "ops", + label: "Dashboard Chat", + }); + + expect(created.ok).toBe(true); + expect(created.payload?.key).toMatch(/^agent:ops:dashboard:/); + expect(created.payload?.entry?.label).toBe("Dashboard Chat"); + expect(created.payload?.sessionId).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + + const rawStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { sessionId?: string; label?: string } + >; + const key = created.payload?.key as string; + expect(rawStore[key]).toMatchObject({ + sessionId: created.payload?.sessionId, + label: "Dashboard Chat", + }); + + const transcriptPath = path.join(dir, `${created.payload?.sessionId}.jsonl`); + const transcript = await fs.readFile(transcriptPath, "utf-8"); + const [headerLine] = transcript.trim().split(/\r?\n/, 1); + expect(JSON.parse(headerLine) as { type?: string; id?: string }).toMatchObject({ + type: "session", + id: created.payload?.sessionId, + }); + + ws.close(); + }); + test("lists and patches session store via sessions.* RPC", async () => { const { dir, storePath } = await createSessionStoreDir(); const now = Date.now();