diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 7e2493db3a7..951b3ccf1a3 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -354,6 +354,52 @@ async function getMainPreviewEntry(ws: import("ws").WebSocket) { return entry; } +type SessionsHandlers = Awaited>; + +async function directSessionReq( + method: keyof SessionsHandlers, + params: Record, + opts?: { + context?: Record; + client?: Parameters[0]["client"]; + isWebchatConnect?: Parameters[0]["isWebchatConnect"]; + coercePayload?: (payload: unknown) => TPayload; + }, +): Promise<{ ok: boolean; payload?: TPayload; error?: { code?: string; message?: string } }> { + const sessionsHandlers = await getSessionsHandlers(); + let result: + | { ok: boolean; payload?: TPayload; error?: { code?: string; message?: string } } + | undefined; + await sessionsHandlers[method]({ + req: {} as never, + params, + respond: (ok, payload, error) => { + result = { + ok, + payload: + payload === undefined + ? undefined + : opts?.coercePayload + ? opts.coercePayload(payload) + : (payload as TPayload), + error, + }; + }, + context: { + broadcastToConnIds: vi.fn(), + getSessionEventSubscriberConnIds: () => new Set(), + loadGatewayModelCatalog: async () => piSdkMock.models, + ...(opts?.context ?? {}), + } as never, + client: opts?.client ?? null, + isWebchatConnect: opts?.isWebchatConnect ?? (() => false), + }); + if (!result) { + throw new Error(`${String(method)} did not respond`); + } + return result; +} + function isInternalHookEvent(value: unknown): value is InternalHookEvent { if (!value || typeof value !== "object") { return false; @@ -417,9 +463,7 @@ describe("gateway server sessions", () => { }, }, }); - const { ws } = await openClient(); - - const created = await rpcReq<{ + const created = await directSessionReq<{ key?: string; sessionId?: string; entry?: { @@ -429,7 +473,7 @@ describe("gateway server sessions", () => { parentSessionKey?: string; sessionFile?: string; }; - }>(ws, "sessions.create", { + }>("sessions.create", { agentId: "ops", label: "Dashboard Chat", model: "openai/gpt-test-a", @@ -475,22 +519,19 @@ describe("gateway server sessions", () => { type: "session", id: created.payload?.sessionId, }); - - ws.close(); }); test("sessions.create accepts an explicit key for persistent dashboard sessions", async () => { await createSessionStoreDir(); - const { ws } = await openClient(); const key = "agent:ops-agent:dashboard:direct:subagent-orchestrator"; - const created = await rpcReq<{ + const created = await directSessionReq<{ key?: string; sessionId?: string; entry?: { label?: string; }; - }>(ws, "sessions.create", { + }>("sessions.create", { key, label: "Dashboard Orchestrator", }); @@ -501,21 +542,18 @@ describe("gateway server sessions", () => { expect(created.payload?.sessionId).toMatch( /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, ); - - ws.close(); }); test("sessions.create scopes the main alias to the requested agent", async () => { const { storePath } = await createSessionStoreDir(); - const { ws } = await openClient(); - const created = await rpcReq<{ + const created = await directSessionReq<{ key?: string; sessionId?: string; entry?: { sessionFile?: string; }; - }>(ws, "sessions.create", { + }>("sessions.create", { key: "main", agentId: "longmemeval", }); @@ -532,21 +570,18 @@ describe("gateway server sessions", () => { >; expect(rawStore["agent:longmemeval:main"]?.sessionId).toBe(created.payload?.sessionId); expect(rawStore["agent:main:main"]).toBeUndefined(); - - ws.close(); }); test("sessions.create preserves global and unknown sentinel keys", async () => { const { storePath } = await createSessionStoreDir(); - const { ws } = await openClient(); - const globalCreated = await rpcReq<{ + const globalCreated = await directSessionReq<{ key?: string; sessionId?: string; entry?: { sessionFile?: string; }; - }>(ws, "sessions.create", { + }>("sessions.create", { key: "global", agentId: "longmemeval", }); @@ -555,13 +590,13 @@ describe("gateway server sessions", () => { expect(globalCreated.payload?.key).toBe("global"); expect(globalCreated.payload?.entry?.sessionFile).toBeTruthy(); - const unknownCreated = await rpcReq<{ + const unknownCreated = await directSessionReq<{ key?: string; sessionId?: string; entry?: { sessionFile?: string; }; - }>(ws, "sessions.create", { + }>("sessions.create", { key: "unknown", agentId: "longmemeval", }); @@ -580,15 +615,12 @@ describe("gateway server sessions", () => { expect(rawStore.unknown?.sessionId).toBe(unknownCreated.payload?.sessionId); expect(rawStore["agent:longmemeval:global"]).toBeUndefined(); expect(rawStore["agent:longmemeval:unknown"]).toBeUndefined(); - - ws.close(); }); test("sessions.create rejects unknown parentSessionKey", async () => { await createSessionStoreDir(); - const { ws } = await openClient(); - const created = await rpcReq(ws, "sessions.create", { + const created = await directSessionReq("sessions.create", { agentId: "ops", parentSessionKey: "agent:main:missing", }); @@ -597,8 +629,6 @@ describe("gateway server sessions", () => { expect((created.error as { message?: string } | undefined)?.message ?? "").toContain( "unknown parent session", ); - - ws.close(); }); test("sessions.create can start the first agent turn from an initial task", async () => { @@ -1662,12 +1692,19 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const entry = await getMainPreviewEntry(ws); + const preview = await directSessionReq<{ + previews: Array<{ + key: string; + status: string; + items: Array<{ role: string; text: string }>; + }>; + }>("sessions.preview", { keys: ["main"], limit: 3, maxChars: 120 }); + expect(preview.ok).toBe(true); + const entry = preview.payload?.previews[0]; + expect(entry?.key).toBe("main"); + expect(entry?.status).toBe("ok"); expect(entry?.items.map((item) => item.role)).toEqual(["assistant", "tool", "assistant"]); expect(entry?.items[1]?.text).toContain("call weather"); - - ws.close(); }); test("sessions.reset recomputes model from defaults instead of stale runtime model", async () => { @@ -1690,8 +1727,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ + const reset = await directSessionReq<{ ok: true; key: string; entry: { @@ -1701,7 +1737,7 @@ describe("gateway server sessions", () => { model?: string; contextTokens?: number; }; - }>(ws, "sessions.reset", { key: "main" }); + }>("sessions.reset", { key: "main" }); expect(reset.ok).toBe(true); expect(reset.payload?.key).toBe("agent:main:main"); @@ -1711,8 +1747,6 @@ describe("gateway server sessions", () => { expect(reset.payload?.entry.model).toBe("gpt-test-a"); expect(reset.payload?.entry.contextTokens).toBeUndefined(); await expect(fs.stat(reset.payload?.entry.sessionFile as string)).resolves.toBeTruthy(); - - ws.close(); }); test("sessions.reset preserves legacy explicit model overrides without modelOverrideSource", async () => { @@ -1736,8 +1770,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ + const reset = await directSessionReq<{ ok: true; key: string; entry: { @@ -1747,7 +1780,7 @@ describe("gateway server sessions", () => { modelProvider?: string; model?: string; }; - }>(ws, "sessions.reset", { key: "main" }); + }>("sessions.reset", { key: "main" }); expect(reset.ok).toBe(true); expect(reset.payload?.entry.providerOverride).toBe("anthropic"); @@ -1771,8 +1804,6 @@ describe("gateway server sessions", () => { expect(store["agent:main:main"]?.modelOverrideSource).toBe("user"); expect(store["agent:main:main"]?.modelProvider).toBe("anthropic"); expect(store["agent:main:main"]?.model).toBe("claude-opus-4-1"); - - ws.close(); }); test("sessions.reset clears fallback-pinned model overrides and restores the selected model", async () => { @@ -1798,8 +1829,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ + const reset = await directSessionReq<{ ok: true; key: string; entry: { @@ -1808,7 +1838,7 @@ describe("gateway server sessions", () => { modelProvider?: string; model?: string; }; - }>(ws, "sessions.reset", { key: "main" }); + }>("sessions.reset", { key: "main" }); expect(reset.ok).toBe(true); expect(reset.payload?.entry.providerOverride).toBeUndefined(); @@ -1829,8 +1859,6 @@ describe("gateway server sessions", () => { expect(store["agent:main:main"]?.modelOverride).toBeUndefined(); expect(store["agent:main:main"]?.modelProvider).toBe("openai"); expect(store["agent:main:main"]?.model).toBe("gpt-test-a"); - - ws.close(); }); test("sessions.reset follows the updated default after an auto fallback pinned an older default", async () => { @@ -1856,8 +1884,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ + const reset = await directSessionReq<{ ok: true; key: string; entry: { @@ -1866,7 +1893,7 @@ describe("gateway server sessions", () => { modelProvider?: string; model?: string; }; - }>(ws, "sessions.reset", { key: "main" }); + }>("sessions.reset", { key: "main" }); expect(reset.ok).toBe(true); expect(reset.payload?.entry.providerOverride).toBeUndefined(); @@ -1887,8 +1914,6 @@ describe("gateway server sessions", () => { expect(store["agent:main:main"]?.modelOverride).toBeUndefined(); expect(store["agent:main:main"]?.modelProvider).toBe("openai"); expect(store["agent:main:main"]?.model).toBe("gpt-test-c"); - - ws.close(); }); test("sessions.reset preserves spawned session ownership metadata", async () => { @@ -1958,8 +1983,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ + const reset = await directSessionReq<{ ok: true; key: string; entry: { @@ -2015,7 +2039,7 @@ describe("gateway server sessions", () => { }; label?: string; }; - }>(ws, "sessions.reset", { key: "subagent:child" }); + }>("sessions.reset", { key: "subagent:child" }); expect(reset.ok).toBe(true); expect(reset.payload?.entry.sessionFile).toBe(customSessionFile); @@ -2177,8 +2201,6 @@ describe("gateway server sessions", () => { threadId: "thread-1", }); expect(store["agent:main:subagent:child"]?.label).toBe("owned child"); - - ws.close(); }); test("sessions.preview resolves legacy mixed-case main alias with custom mainKey", async () => { @@ -2418,12 +2440,10 @@ describe("gateway server sessions", () => { embeddedRunMock.activeIds.add("sess-active"); embeddedRunMock.waitResults.set("sess-active", true); - const { ws } = await openClient(); - - const mainDelete = await rpcReq(ws, "sessions.delete", { key: "main" }); + const mainDelete = await directSessionReq("sessions.delete", { key: "main" }); expect(mainDelete.ok).toBe(false); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "discord:group:dev", }); expect(deleted.ok).toBe(true); @@ -2461,8 +2481,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:discord:group:dev", reason: "session-delete", }); - - ws.close(); }); test("sessions.delete closes ACP runtime handles before removing ACP sessions", async () => { @@ -2487,8 +2505,7 @@ describe("gateway server sessions", () => { }, }, }); - const { ws } = await openClient(); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "discord:group:dev", }); expect(deleted.ok).toBe(true); @@ -2506,8 +2523,6 @@ describe("gateway server sessions", () => { reason: "session-delete", sessionKey: "agent:main:discord:group:dev", }); - - ws.close(); }); test("sessions.delete emits session_end with deleted reason and no replacement", async () => { @@ -2535,8 +2550,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "discord:group:delete", }); expect(deleted.ok).toBe(true); @@ -2562,7 +2576,6 @@ describe("gateway server sessions", () => { sessionKey: "agent:main:discord:group:delete", agentId: "main", }); - ws.close(); }); test("sessions.delete does not emit lifecycle events when nothing was deleted", async () => { @@ -2574,8 +2587,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "agent:main:subagent:missing", }); @@ -2583,8 +2595,6 @@ describe("gateway server sessions", () => { expect(deleted.payload?.deleted).toBe(false); expect(subagentLifecycleHookMocks.runSubagentEnded).not.toHaveBeenCalled(); expect(threadBindingMocks.unbindThreadBindingsBySessionKey).not.toHaveBeenCalled(); - - ws.close(); }); test("sessions.delete emits subagent targetKind for subagent sessions", async () => { @@ -2599,8 +2609,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "agent:main:subagent:worker", }); expect(deleted.ok).toBe(true); @@ -2622,8 +2631,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:subagent:worker", reason: "session-delete", }); - - ws.close(); }); test("sessions.delete can skip lifecycle hooks while still unbinding thread bindings", async () => { @@ -2638,8 +2645,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "agent:main:subagent:worker", emitLifecycleHooks: false, }); @@ -2651,8 +2657,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:subagent:worker", reason: "session-delete", }); - - ws.close(); }); test("sessions.delete directly unbinds thread bindings when hooks are unavailable", async () => { @@ -2668,8 +2672,7 @@ describe("gateway server sessions", () => { }); subagentLifecycleHookState.hasSubagentEndedHook = false; - const { ws } = await openClient(); - const deleted = await rpcReq<{ ok: true; deleted: boolean }>(ws, "sessions.delete", { + const deleted = await directSessionReq<{ ok: true; deleted: boolean }>("sessions.delete", { key: "agent:main:subagent:worker", }); expect(deleted.ok).toBe(true); @@ -2679,8 +2682,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:subagent:worker", reason: "session-delete", }); - - ws.close(); }); test("sessions.reset aborts active runs and clears queues", async () => { @@ -2693,10 +2694,7 @@ describe("gateway server sessions", () => { embeddedRunMock.activeIds.add("sess-main"); embeddedRunMock.waitResults.set("sess-main", true); - const { ws } = await openClient(); - - const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>( - ws, + const reset = await directSessionReq<{ ok: true; key: string; entry: { sessionId: string } }>( "sessions.reset", { key: "main", @@ -2735,8 +2733,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:main", reason: "session-reset", }); - - ws.close(); }); test("sessions.reset closes ACP runtime handles for ACP sessions", async () => { @@ -2778,8 +2774,7 @@ describe("gateway server sessions", () => { }, }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ + const reset = await directSessionReq<{ ok: true; key: string; entry: { @@ -2801,7 +2796,7 @@ describe("gateway server sessions", () => { state?: string; }; }; - }>(ws, "sessions.reset", { + }>("sessions.reset", { key: "main", }); expect(reset.ok).toBe(true); @@ -2872,8 +2867,6 @@ describe("gateway server sessions", () => { state: "idle", }); expect(store["agent:main:main"]?.acp?.identity?.acpxSessionId).toBeUndefined(); - - ws.close(); }); test("sessions.reset does not emit lifecycle events when key does not exist", async () => { @@ -2885,20 +2878,17 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>( - ws, - "sessions.reset", - { - key: "agent:main:subagent:missing", - }, - ); + const reset = await directSessionReq<{ + ok: true; + key: string; + entry: { sessionId: string }; + }>("sessions.reset", { + key: "agent:main:subagent:missing", + }); expect(reset.ok).toBe(true); expect(subagentLifecycleHookMocks.runSubagentEnded).not.toHaveBeenCalled(); expect(threadBindingMocks.unbindThreadBindingsBySessionKey).not.toHaveBeenCalled(); - - ws.close(); }); test("sessions.reset emits subagent targetKind for subagent sessions", async () => { @@ -2913,14 +2903,13 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>( - ws, - "sessions.reset", - { - key: "agent:main:subagent:worker", - }, - ); + const reset = await directSessionReq<{ + ok: true; + key: string; + entry: { sessionId: string }; + }>("sessions.reset", { + key: "agent:main:subagent:worker", + }); expect(reset.ok).toBe(true); expect(reset.payload?.key).toBe("agent:main:subagent:worker"); expect(reset.payload?.entry.sessionId).not.toBe("sess-subagent"); @@ -2941,8 +2930,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:subagent:worker", reason: "session-reset", }); - - ws.close(); }); test("sessions.reset directly unbinds thread bindings when hooks are unavailable", async () => { @@ -2958,8 +2945,7 @@ describe("gateway server sessions", () => { }); subagentLifecycleHookState.hasSubagentEndedHook = false; - const { ws } = await openClient(); - const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", { + const reset = await directSessionReq<{ ok: true; key: string }>("sessions.reset", { key: "main", }); expect(reset.ok).toBe(true); @@ -2969,8 +2955,6 @@ describe("gateway server sessions", () => { targetSessionKey: "agent:main:main", reason: "session-reset", }); - - ws.close(); }); test("sessions.reset emits internal command hook with reason", async () => { @@ -2983,8 +2967,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", { + const reset = await directSessionReq<{ ok: true; key: string }>("sessions.reset", { key: "main", reason: "new", }); @@ -3020,7 +3003,6 @@ describe("gateway server sessions", () => { }, }); expect(event.context?.previousSessionEntry).toMatchObject({ sessionId: "sess-main" }); - ws.close(); }); test("sessions.reset emits before_reset hook with transcript context", async () => { @@ -3048,8 +3030,7 @@ describe("gateway server sessions", () => { beforeResetHookState.hasBeforeResetHook = true; - const { ws } = await openClient(); - const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", { + const reset = await directSessionReq<{ ok: true; key: string }>("sessions.reset", { key: "main", reason: "new", }); @@ -3073,7 +3054,6 @@ describe("gateway server sessions", () => { sessionKey: "agent:main:main", sessionId: "sess-main", }); - ws.close(); }); test("sessions.reset emits enriched session_end and session_start hooks", async () => { @@ -3099,8 +3079,7 @@ describe("gateway server sessions", () => { }, }); - const { ws } = await openClient(); - const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", { + const reset = await directSessionReq<{ ok: true; key: string }>("sessions.reset", { key: "main", reason: "new", }); @@ -3141,7 +3120,6 @@ describe("gateway server sessions", () => { sessionKey: "agent:main:main", agentId: "main", }); - ws.close(); }); test("sessions.reset returns unavailable when active run does not stop", async () => { @@ -3155,9 +3133,7 @@ describe("gateway server sessions", () => { embeddedRunMock.activeIds.add("sess-main"); embeddedRunMock.waitResults.set("sess-main", false); - const { ws } = await openClient(); - - const reset = await rpcReq(ws, "sessions.reset", { + const reset = await directSessionReq("sessions.reset", { key: "main", }); expect(reset.ok).toBe(false); @@ -3179,8 +3155,6 @@ describe("gateway server sessions", () => { expect(store["agent:main:main"]?.sessionId).toBe("sess-main"); const filesAfterResetAttempt = await fs.readdir(dir); expect(filesAfterResetAttempt.some((f) => f.startsWith("sess-main.jsonl.reset."))).toBe(false); - - ws.close(); }); test("sessions.reset emits before_reset for the entry actually reset under the store lock", async () => {