diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index eb66189899d..47f9042ac74 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -278,6 +278,30 @@ export const sessionsHandlers: GatewayRequestHandlers = { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); + const { entry } = loadSessionEntry(key); + const sessionId = entry?.sessionId; + const queueKeys = new Set(target.storeKeys); + queueKeys.add(target.canonicalKey); + if (sessionId) { + queueKeys.add(sessionId); + } + clearSessionQueues([...queueKeys]); + stopSubagentsForRequester({ cfg, requesterSessionKey: target.canonicalKey }); + if (sessionId) { + abortEmbeddedPiRun(sessionId); + const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000); + if (!ended) { + respond( + false, + undefined, + errorShape( + ErrorCodes.UNAVAILABLE, + `Session ${key} is still active; try again in a moment.`, + ), + ); + return; + } + } const storePath = target.storePath; let oldSessionId: string | undefined; let oldSessionFile: string | undefined; diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts index 1eb83fcf7b4..e7718cb8eac 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.e2e.test.ts @@ -597,4 +597,49 @@ describe("gateway server sessions", () => { ws.close(); }); + + test("sessions.reset aborts active runs and clears queues", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-")); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + + await fs.writeFile( + path.join(dir, "sess-main.jsonl"), + `${JSON.stringify({ role: "user", content: "hello" })}\n`, + "utf-8", + ); + + await writeSessionStore({ + entries: { + main: { sessionId: "sess-main", updatedAt: Date.now() }, + }, + }); + + embeddedRunMock.activeIds.add("sess-main"); + embeddedRunMock.waitResults.set("sess-main", true); + + const { ws } = await openClient(); + + const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>( + ws, + "sessions.reset", + { + key: "main", + }, + ); + expect(reset.ok).toBe(true); + expect(reset.payload?.key).toBe("agent:main:main"); + expect(reset.payload?.entry.sessionId).not.toBe("sess-main"); + expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({ + cfg: expect.any(Object), + requesterSessionKey: "agent:main:main", + }); + expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1); + const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[]; + expect(clearedKeys).toEqual(expect.arrayContaining(["main", "agent:main:main", "sess-main"])); + expect(embeddedRunMock.abortCalls).toEqual(["sess-main"]); + expect(embeddedRunMock.waitCalls).toEqual(["sess-main"]); + + ws.close(); + }); });