fix(gateway): split conversation reset from admin reset

This commit is contained in:
Peter Steinberger
2026-03-11 02:49:38 +00:00
parent 0ab8d20917
commit c91d1622d5
6 changed files with 456 additions and 420 deletions

View File

@@ -8,7 +8,7 @@ const mocks = vi.hoisted(() => ({
updateSessionStore: vi.fn(),
agentCommand: vi.fn(),
registerAgentRunContext: vi.fn(),
sessionsResetHandler: vi.fn(),
performGatewaySessionReset: vi.fn(),
loadConfigReturn: {} as Record<string, unknown>,
}));
@@ -62,11 +62,9 @@ vi.mock("../../infra/agent-events.js", () => ({
onAgentEvent: vi.fn(),
}));
vi.mock("./sessions.js", () => ({
sessionsHandlers: {
"sessions.reset": (...args: unknown[]) =>
(mocks.sessionsResetHandler as (...args: unknown[]) => unknown)(...args),
},
vi.mock("../session-reset-service.js", () => ({
performGatewaySessionReset: (...args: unknown[]) =>
(mocks.performGatewaySessionReset as (...args: unknown[]) => unknown)(...args),
}));
vi.mock("../../sessions/send-policy.js", () => ({
@@ -158,7 +156,7 @@ function resetTimeConfig() {
async function expectResetCall(expectedMessage: string) {
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
expect(mocks.sessionsResetHandler).toHaveBeenCalledTimes(1);
expect(mocks.performGatewaySessionReset).toHaveBeenCalledTimes(1);
const call = readLastAgentCommandCall();
expect(call?.message).toBe(expectedMessage);
return call;
@@ -208,18 +206,16 @@ function mockSessionResetSuccess(params: {
}) {
const key = params.key ?? "agent:main:main";
const sessionId = params.sessionId ?? "reset-session-id";
mocks.sessionsResetHandler.mockImplementation(
async (opts: {
params: { key: string; reason: string };
respond: (ok: boolean, payload?: unknown) => void;
}) => {
expect(opts.params.key).toBe(key);
expect(opts.params.reason).toBe(params.reason);
opts.respond(true, {
mocks.performGatewaySessionReset.mockImplementation(
async (opts: { key: string; reason: string; commandSource: string }) => {
expect(opts.key).toBe(key);
expect(opts.reason).toBe(params.reason);
expect(opts.commandSource).toBe("gateway:agent");
return {
ok: true,
key,
entry: { sessionId },
});
};
},
);
}
@@ -560,7 +556,7 @@ describe("gateway agent handler", () => {
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
expect(mocks.sessionsResetHandler).toHaveBeenCalledTimes(1);
expect(mocks.performGatewaySessionReset).toHaveBeenCalledTimes(1);
const call = readLastAgentCommandCall();
// Message is now dynamically built with current date — check key substrings
expect(call?.message).toContain("Execute your Session Startup sequence now");
@@ -572,7 +568,7 @@ describe("gateway agent handler", () => {
it("uses /reset suffix as the post-reset message and still injects timestamp", async () => {
setupNewYorkTimeConfig("2026-01-29T01:30:00.000Z");
mockSessionResetSuccess({ reason: "reset" });
mocks.sessionsResetHandler.mockClear();
mocks.performGatewaySessionReset.mockClear();
primeMainAgentRun({
sessionId: "reset-session-id",
cfg: mocks.loadConfigReturn,

View File

@@ -46,6 +46,7 @@ import {
validateAgentParams,
validateAgentWaitParams,
} from "../protocol/index.js";
import { performGatewaySessionReset } from "../session-reset-service.js";
import {
canonicalizeSpawnedByForAgent,
loadSessionEntry,
@@ -62,7 +63,6 @@ import {
waitForTerminalGatewayDedupe,
} from "./agent-wait-dedupe.js";
import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js";
import { sessionsHandlers } from "./sessions.js";
import type { GatewayRequestHandlerOptions, GatewayRequestHandlers } from "./types.js";
const RESET_COMMAND_RE = /^\/(new|reset)(?:\s+([\s\S]*))?$/i;
@@ -72,101 +72,26 @@ function resolveSenderIsOwnerFromClient(client: GatewayRequestHandlerOptions["cl
return scopes.includes(ADMIN_SCOPE);
}
function isGatewayErrorShape(value: unknown): value is { code: string; message: string } {
if (!value || typeof value !== "object") {
return false;
}
const candidate = value as { code?: unknown; message?: unknown };
return typeof candidate.code === "string" && typeof candidate.message === "string";
}
async function runSessionResetFromAgent(params: {
key: string;
reason: "new" | "reset";
idempotencyKey: string;
context: GatewayRequestHandlerOptions["context"];
client: GatewayRequestHandlerOptions["client"];
isWebchatConnect: GatewayRequestHandlerOptions["isWebchatConnect"];
}): Promise<
| { ok: true; key: string; sessionId?: string }
| { ok: false; error: ReturnType<typeof errorShape> }
> {
return await new Promise((resolve) => {
let settled = false;
const settle = (
result:
| { ok: true; key: string; sessionId?: string }
| { ok: false; error: ReturnType<typeof errorShape> },
) => {
if (settled) {
return;
}
settled = true;
resolve(result);
};
const respond: GatewayRequestHandlerOptions["respond"] = (ok, payload, error) => {
if (!ok) {
settle({
ok: false,
error: isGatewayErrorShape(error)
? error
: errorShape(ErrorCodes.UNAVAILABLE, String(error ?? "sessions.reset failed")),
});
return;
}
const payloadObj = payload as
| {
key?: unknown;
entry?: {
sessionId?: unknown;
};
}
| undefined;
const key = typeof payloadObj?.key === "string" ? payloadObj.key : params.key;
const sessionId =
payloadObj?.entry && typeof payloadObj.entry.sessionId === "string"
? payloadObj.entry.sessionId
: undefined;
settle({ ok: true, key, sessionId });
};
const resetResult = sessionsHandlers["sessions.reset"]({
req: {
type: "req",
id: `${params.idempotencyKey}:reset`,
method: "sessions.reset",
},
params: {
key: params.key,
reason: params.reason,
},
context: params.context,
client: params.client,
isWebchatConnect: params.isWebchatConnect,
respond,
});
void (async () => {
try {
await resetResult;
if (!settled) {
settle({
ok: false,
error: errorShape(
ErrorCodes.UNAVAILABLE,
"sessions.reset completed without returning a response",
),
});
}
} catch (err: unknown) {
settle({
ok: false,
error: errorShape(ErrorCodes.UNAVAILABLE, String(err)),
});
}
})();
const result = await performGatewaySessionReset({
key: params.key,
reason: params.reason,
commandSource: "gateway:agent",
});
if (!result.ok) {
return result;
}
return {
ok: true,
key: result.key,
sessionId: result.entry.sessionId,
};
}
function dispatchAgentRunFromGateway(params: {
@@ -399,10 +324,6 @@ export const agentHandlers: GatewayRequestHandlers = {
const resetResult = await runSessionResetFromAgent({
key: requestedSessionKey,
reason: resetReason,
idempotencyKey: idem,
context,
client,
isWebchatConnect,
});
if (!resetResult.ok) {
respond(false, undefined, resetResult.error);

View File

@@ -1,29 +1,13 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
import { resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js";
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js";
import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js";
import { clearSessionQueues } from "../../auto-reply/reply/queue.js";
import { closeTrackedBrowserTabsForSessions } from "../../browser/session-tab-registry.js";
import { loadConfig } from "../../config/config.js";
import {
loadSessionStore,
snapshotSessionOrigin,
resolveMainSessionKey,
type SessionEntry,
updateSessionStore,
} from "../../config/sessions.js";
import { unbindThreadBindingsBySessionKey } from "../../discord/monitor/thread-bindings.js";
import { logVerbose } from "../../globals.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import {
isSubagentSessionKey,
normalizeAgentId,
parseAgentSessionKey,
} from "../../routing/session-key.js";
import { normalizeAgentId, parseAgentSessionKey } from "../../routing/session-key.js";
import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js";
import {
ErrorCodes,
@@ -36,9 +20,14 @@ import {
validateSessionsResetParams,
validateSessionsResolveParams,
} from "../protocol/index.js";
import {
archiveSessionTranscriptsForSession,
cleanupSessionBeforeMutation,
emitSessionUnboundLifecycleEvent,
performGatewaySessionReset,
} from "../session-reset-service.js";
import {
archiveFileOnDisk,
archiveSessionTranscripts,
listSessionsFromStore,
loadCombinedSessionStoreForGateway,
loadSessionEntry,
@@ -128,219 +117,6 @@ function migrateAndPruneSessionStoreKey(params: {
return { target, primaryKey, entry: params.store[primaryKey] };
}
function stripRuntimeModelState(entry?: SessionEntry): SessionEntry | undefined {
if (!entry) {
return entry;
}
return {
...entry,
model: undefined,
modelProvider: undefined,
contextTokens: undefined,
systemPromptReport: undefined,
};
}
function archiveSessionTranscriptsForSession(params: {
sessionId: string | undefined;
storePath: string;
sessionFile?: string;
agentId?: string;
reason: "reset" | "deleted";
}): string[] {
if (!params.sessionId) {
return [];
}
return archiveSessionTranscripts({
sessionId: params.sessionId,
storePath: params.storePath,
sessionFile: params.sessionFile,
agentId: params.agentId,
reason: params.reason,
});
}
async function emitSessionUnboundLifecycleEvent(params: {
targetSessionKey: string;
reason: "session-reset" | "session-delete";
emitHooks?: boolean;
}) {
const targetKind = isSubagentSessionKey(params.targetSessionKey) ? "subagent" : "acp";
unbindThreadBindingsBySessionKey({
targetSessionKey: params.targetSessionKey,
targetKind,
reason: params.reason,
sendFarewell: true,
});
if (params.emitHooks === false) {
return;
}
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("subagent_ended")) {
return;
}
await hookRunner.runSubagentEnded(
{
targetSessionKey: params.targetSessionKey,
targetKind,
reason: params.reason,
sendFarewell: true,
outcome: params.reason === "session-reset" ? "reset" : "deleted",
},
{
childSessionKey: params.targetSessionKey,
},
);
}
async function ensureSessionRuntimeCleanup(params: {
cfg: ReturnType<typeof loadConfig>;
key: string;
target: ReturnType<typeof resolveGatewaySessionStoreTarget>;
sessionId?: string;
}) {
const closeTrackedBrowserTabs = async () => {
const closeKeys = new Set<string>([
params.key,
params.target.canonicalKey,
...params.target.storeKeys,
params.sessionId ?? "",
]);
return await closeTrackedBrowserTabsForSessions({
sessionKeys: [...closeKeys],
onWarn: (message) => logVerbose(message),
});
};
const queueKeys = new Set<string>(params.target.storeKeys);
queueKeys.add(params.target.canonicalKey);
if (params.sessionId) {
queueKeys.add(params.sessionId);
}
clearSessionQueues([...queueKeys]);
stopSubagentsForRequester({ cfg: params.cfg, requesterSessionKey: params.target.canonicalKey });
if (!params.sessionId) {
clearBootstrapSnapshot(params.target.canonicalKey);
await closeTrackedBrowserTabs();
return undefined;
}
abortEmbeddedPiRun(params.sessionId);
const ended = await waitForEmbeddedPiRunEnd(params.sessionId, 15_000);
clearBootstrapSnapshot(params.target.canonicalKey);
if (ended) {
await closeTrackedBrowserTabs();
return undefined;
}
return errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${params.key} is still active; try again in a moment.`,
);
}
const ACP_RUNTIME_CLEANUP_TIMEOUT_MS = 15_000;
async function runAcpCleanupStep(params: {
op: () => Promise<void>;
}): Promise<{ status: "ok" } | { status: "timeout" } | { status: "error"; error: unknown }> {
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<{ status: "timeout" }>((resolve) => {
timer = setTimeout(() => resolve({ status: "timeout" }), ACP_RUNTIME_CLEANUP_TIMEOUT_MS);
});
const opPromise = params
.op()
.then(() => ({ status: "ok" as const }))
.catch((error: unknown) => ({ status: "error" as const, error }));
const outcome = await Promise.race([opPromise, timeoutPromise]);
if (timer) {
clearTimeout(timer);
}
return outcome;
}
async function closeAcpRuntimeForSession(params: {
cfg: ReturnType<typeof loadConfig>;
sessionKey: string;
entry?: SessionEntry;
reason: "session-reset" | "session-delete";
}) {
if (!params.entry?.acp) {
return undefined;
}
const acpManager = getAcpSessionManager();
const cancelOutcome = await runAcpCleanupStep({
op: async () => {
await acpManager.cancelSession({
cfg: params.cfg,
sessionKey: params.sessionKey,
reason: params.reason,
});
},
});
if (cancelOutcome.status === "timeout") {
return errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${params.sessionKey} is still active; try again in a moment.`,
);
}
if (cancelOutcome.status === "error") {
logVerbose(
`sessions.${params.reason}: ACP cancel failed for ${params.sessionKey}: ${String(cancelOutcome.error)}`,
);
}
const closeOutcome = await runAcpCleanupStep({
op: async () => {
await acpManager.closeSession({
cfg: params.cfg,
sessionKey: params.sessionKey,
reason: params.reason,
requireAcpSession: false,
allowBackendUnavailable: true,
});
},
});
if (closeOutcome.status === "timeout") {
return errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${params.sessionKey} is still active; try again in a moment.`,
);
}
if (closeOutcome.status === "error") {
logVerbose(
`sessions.${params.reason}: ACP runtime close failed for ${params.sessionKey}: ${String(closeOutcome.error)}`,
);
}
return undefined;
}
async function cleanupSessionBeforeMutation(params: {
cfg: ReturnType<typeof loadConfig>;
key: string;
target: ReturnType<typeof resolveGatewaySessionStoreTarget>;
entry: SessionEntry | undefined;
legacyKey?: string;
canonicalKey?: string;
reason: "session-reset" | "session-delete";
}) {
const cleanupError = await ensureSessionRuntimeCleanup({
cfg: params.cfg,
key: params.key,
target: params.target,
sessionId: params.entry?.sessionId,
});
if (cleanupError) {
return cleanupError;
}
return await closeAcpRuntimeForSession({
cfg: params.cfg,
sessionKey: params.legacyKey ?? params.canonicalKey ?? params.target.canonicalKey ?? params.key,
entry: params.entry,
reason: params.reason,
});
}
export const sessionsHandlers: GatewayRequestHandlers = {
"sessions.list": ({ params, respond }) => {
if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) {
@@ -486,89 +262,17 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key);
const { entry, legacyKey, canonicalKey } = loadSessionEntry(key);
const hadExistingEntry = Boolean(entry);
const commandReason = p.reason === "new" ? "new" : "reset";
const hookEvent = createInternalHookEvent(
"command",
commandReason,
target.canonicalKey ?? key,
{
sessionEntry: entry,
previousSessionEntry: entry,
commandSource: "gateway:sessions.reset",
cfg,
},
);
await triggerInternalHook(hookEvent);
const mutationCleanupError = await cleanupSessionBeforeMutation({
cfg,
const reason = p.reason === "new" ? "new" : "reset";
const result = await performGatewaySessionReset({
key,
target,
entry,
legacyKey,
canonicalKey,
reason: "session-reset",
reason,
commandSource: "gateway:sessions.reset",
});
if (mutationCleanupError) {
respond(false, undefined, mutationCleanupError);
if (!result.ok) {
respond(false, undefined, result.error);
return;
}
let oldSessionId: string | undefined;
let oldSessionFile: string | undefined;
const next = await updateSessionStore(storePath, (store) => {
const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store });
const entry = store[primaryKey];
const resetEntry = stripRuntimeModelState(entry);
const parsed = parseAgentSessionKey(primaryKey);
const sessionAgentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg));
const resolvedModel = resolveSessionModelRef(cfg, resetEntry, sessionAgentId);
oldSessionId = entry?.sessionId;
oldSessionFile = entry?.sessionFile;
const now = Date.now();
const nextEntry: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
responseUsage: entry?.responseUsage,
model: resolvedModel.model,
modelProvider: resolvedModel.provider,
contextTokens: resetEntry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
origin: snapshotSessionOrigin(entry),
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
// Reset token counts to 0 on session reset (#1523)
inputTokens: 0,
outputTokens: 0,
totalTokens: 0,
totalTokensFresh: true,
};
store[primaryKey] = nextEntry;
return nextEntry;
});
// Archive old transcript so it doesn't accumulate on disk (#14869).
archiveSessionTranscriptsForSession({
sessionId: oldSessionId,
storePath,
sessionFile: oldSessionFile,
agentId: target.agentId,
reason: "reset",
});
if (hadExistingEntry) {
await emitSessionUnboundLifecycleEvent({
targetSessionKey: target.canonicalKey ?? key,
reason: "session-reset",
});
}
respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined);
respond(true, { ok: true, key: result.key, entry: result.entry }, undefined);
},
"sessions.delete": async ({ params, respond, client, isWebchatConnect }) => {
if (!assertValidParams(params, validateSessionsDeleteParams, "sessions.delete", respond)) {