diff --git a/src/auto-reply/reply/agent-runner-session-reset.test.ts b/src/auto-reply/reply/agent-runner-session-reset.test.ts new file mode 100644 index 00000000000..a8a0869c6c7 --- /dev/null +++ b/src/auto-reply/reply/agent-runner-session-reset.test.ts @@ -0,0 +1,170 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { SessionEntry } from "../../config/sessions.js"; +import { + resetReplyRunSession, + setAgentRunnerSessionResetTestDeps, +} from "./agent-runner-session-reset.js"; +import type { FollowupRun } from "./queue.js"; + +const refreshQueuedFollowupSessionMock = vi.fn(); +const errorMock = vi.fn(); + +function createFollowupRun(): FollowupRun { + return { + prompt: "hello", + summaryLine: "hello", + enqueuedAt: Date.now(), + run: { + sessionId: "session", + sessionKey: "main", + messageProvider: "whatsapp", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + config: {}, + skillsSnapshot: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "off", + elevatedLevel: "off", + bashElevated: { enabled: false, allowed: false, defaultLevel: "off" }, + timeoutMs: 1_000, + blockReplyBreak: "message_end", + }, + } as unknown as FollowupRun; +} + +async function writeSessionStore( + storePath: string, + sessionKey: string, + entry: SessionEntry, +): Promise { + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: entry }, null, 2), "utf8"); +} + +describe("resetReplyRunSession", () => { + let rootDir = ""; + + beforeEach(async () => { + rootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-reset-run-")); + refreshQueuedFollowupSessionMock.mockReset(); + errorMock.mockReset(); + setAgentRunnerSessionResetTestDeps({ + generateSecureUuid: () => "00000000-0000-0000-0000-000000000123", + refreshQueuedFollowupSession: refreshQueuedFollowupSessionMock as never, + error: errorMock, + }); + }); + + afterEach(async () => { + setAgentRunnerSessionResetTestDeps(); + await fs.rm(rootDir, { recursive: true, force: true }); + }); + + it("rotates the session and clears stale runtime and fallback fields", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: 1, + sessionFile: path.join(rootDir, "session.jsonl"), + modelProvider: "qwencode", + model: "qwen", + contextTokens: 123, + fallbackNoticeSelectedModel: "anthropic/claude", + fallbackNoticeActiveModel: "openai/gpt", + fallbackNoticeReason: "rate limit", + systemPromptReport: { + source: "run", + generatedAt: 1, + systemPrompt: { chars: 1, projectContextChars: 0, nonProjectContextChars: 1 }, + injectedWorkspaceFiles: [], + skills: { promptChars: 0, entries: [] }, + tools: { listChars: 0, schemaChars: 0, entries: [] }, + }, + }; + const sessionStore = { main: sessionEntry }; + const followupRun = createFollowupRun(); + await writeSessionStore(storePath, "main", sessionEntry); + + let activeSessionEntry: SessionEntry | undefined = sessionEntry; + let isNewSession = false; + const reset = await resetReplyRunSession({ + options: { + failureLabel: "compaction failure", + buildLogMessage: (next) => `reset ${next}`, + }, + sessionKey: "main", + queueKey: "main", + activeSessionEntry, + activeSessionStore: sessionStore, + storePath, + followupRun, + onActiveSessionEntry: (entry) => { + activeSessionEntry = entry; + }, + onNewSession: () => { + isNewSession = true; + }, + }); + + expect(reset).toBe(true); + expect(isNewSession).toBe(true); + expect(activeSessionEntry?.sessionId).toBe("00000000-0000-0000-0000-000000000123"); + expect(followupRun.run.sessionId).toBe(activeSessionEntry?.sessionId); + expect(activeSessionEntry?.modelProvider).toBeUndefined(); + expect(activeSessionEntry?.model).toBeUndefined(); + expect(activeSessionEntry?.contextTokens).toBeUndefined(); + expect(activeSessionEntry?.fallbackNoticeSelectedModel).toBeUndefined(); + expect(activeSessionEntry?.fallbackNoticeActiveModel).toBeUndefined(); + expect(activeSessionEntry?.fallbackNoticeReason).toBeUndefined(); + expect(activeSessionEntry?.systemPromptReport).toBeUndefined(); + expect(refreshQueuedFollowupSessionMock).toHaveBeenCalledWith({ + key: "main", + previousSessionId: "session", + nextSessionId: activeSessionEntry?.sessionId, + nextSessionFile: activeSessionEntry?.sessionFile, + }); + expect(errorMock).toHaveBeenCalledWith("reset 00000000-0000-0000-0000-000000000123"); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + main: SessionEntry; + }; + expect(persisted.main.sessionId).toBe(activeSessionEntry?.sessionId); + expect(persisted.main.fallbackNoticeReason).toBeUndefined(); + }); + + it("cleans up the old transcript when requested", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const oldTranscriptPath = path.join(rootDir, "old-session.jsonl"); + await fs.writeFile(oldTranscriptPath, "old", "utf8"); + const sessionEntry: SessionEntry = { + sessionId: "old-session", + updatedAt: 1, + sessionFile: oldTranscriptPath, + }; + const sessionStore = { main: sessionEntry }; + await writeSessionStore(storePath, "main", sessionEntry); + + await resetReplyRunSession({ + options: { + failureLabel: "role ordering conflict", + cleanupTranscripts: true, + buildLogMessage: (next) => `reset ${next}`, + }, + sessionKey: "main", + queueKey: "main", + activeSessionEntry: sessionEntry, + activeSessionStore: sessionStore, + storePath, + followupRun: createFollowupRun(), + onActiveSessionEntry: () => {}, + onNewSession: () => {}, + }); + + await expect(fs.access(oldTranscriptPath)).rejects.toThrow(); + }); +}); diff --git a/src/auto-reply/reply/agent-runner-session-reset.ts b/src/auto-reply/reply/agent-runner-session-reset.ts new file mode 100644 index 00000000000..dbb6dd8c017 --- /dev/null +++ b/src/auto-reply/reply/agent-runner-session-reset.ts @@ -0,0 +1,127 @@ +import fs from "node:fs"; +import type { SessionEntry } from "../../config/sessions.js"; +import { + resolveAgentIdFromSessionKey, + resolveSessionFilePath, + resolveSessionFilePathOptions, + resolveSessionTranscriptPath, + updateSessionStore, +} from "../../config/sessions.js"; +import { generateSecureUuid } from "../../infra/secure-random.js"; +import { defaultRuntime } from "../../runtime.js"; +import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js"; + +type ResetSessionOptions = { + failureLabel: string; + buildLogMessage: (nextSessionId: string) => string; + cleanupTranscripts?: boolean; +}; + +const deps = { + generateSecureUuid, + updateSessionStore, + refreshQueuedFollowupSession, + error: (message: string) => defaultRuntime.error(message), +}; + +export function setAgentRunnerSessionResetTestDeps(overrides?: Partial): void { + Object.assign(deps, { + generateSecureUuid, + updateSessionStore, + refreshQueuedFollowupSession, + error: (message: string) => defaultRuntime.error(message), + ...overrides, + }); +} + +export async function resetReplyRunSession(params: { + options: ResetSessionOptions; + sessionKey?: string; + queueKey: string; + activeSessionEntry?: SessionEntry; + activeSessionStore?: Record; + storePath?: string; + messageThreadId?: string; + followupRun: FollowupRun; + onActiveSessionEntry: (entry: SessionEntry) => void; + onNewSession: (newSessionId: string, nextSessionFile: string) => void; +}): Promise { + if (!params.sessionKey || !params.activeSessionStore || !params.storePath) { + return false; + } + const prevEntry = params.activeSessionStore[params.sessionKey] ?? params.activeSessionEntry; + if (!prevEntry) { + return false; + } + const prevSessionId = params.options.cleanupTranscripts ? prevEntry.sessionId : undefined; + const nextSessionId = deps.generateSecureUuid(); + const nextEntry: SessionEntry = { + ...prevEntry, + sessionId: nextSessionId, + updatedAt: Date.now(), + systemSent: false, + abortedLastRun: false, + modelProvider: undefined, + model: undefined, + inputTokens: undefined, + outputTokens: undefined, + totalTokens: undefined, + totalTokensFresh: false, + estimatedCostUsd: undefined, + cacheRead: undefined, + cacheWrite: undefined, + contextTokens: undefined, + systemPromptReport: undefined, + fallbackNoticeSelectedModel: undefined, + fallbackNoticeActiveModel: undefined, + fallbackNoticeReason: undefined, + }; + const agentId = resolveAgentIdFromSessionKey(params.sessionKey); + const nextSessionFile = resolveSessionTranscriptPath( + nextSessionId, + agentId, + params.messageThreadId, + ); + nextEntry.sessionFile = nextSessionFile; + params.activeSessionStore[params.sessionKey] = nextEntry; + try { + await deps.updateSessionStore(params.storePath, (store) => { + store[params.sessionKey!] = nextEntry; + }); + } catch (err) { + deps.error( + `Failed to persist session reset after ${params.options.failureLabel} (${params.sessionKey}): ${String(err)}`, + ); + } + params.followupRun.run.sessionId = nextSessionId; + params.followupRun.run.sessionFile = nextSessionFile; + deps.refreshQueuedFollowupSession({ + key: params.queueKey, + previousSessionId: prevEntry.sessionId, + nextSessionId, + nextSessionFile, + }); + params.onActiveSessionEntry(nextEntry); + params.onNewSession(nextSessionId, nextSessionFile); + deps.error(params.options.buildLogMessage(nextSessionId)); + if (params.options.cleanupTranscripts && prevSessionId) { + const transcriptCandidates = new Set(); + const resolved = resolveSessionFilePath( + prevSessionId, + prevEntry, + resolveSessionFilePathOptions({ agentId, storePath: params.storePath }), + ); + if (resolved) { + transcriptCandidates.add(resolved); + } + transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId)); + for (const candidate of transcriptCandidates) { + try { + fs.unlinkSync(candidate); + } catch { + // Best-effort cleanup. + } + } + } + return true; +} diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 481b26b2bc3..c0eec929550 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -1,10 +1,6 @@ -import fs from "node:fs/promises"; -import path from "node:path"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; -import * as sessions from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; -import { withStateDirEnv } from "../../test-helpers/state-dir-env.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; import { @@ -261,34 +257,6 @@ describe("runReplyAgent heartbeat followup guard", () => { }); describe("runReplyAgent typing (heartbeat)", () => { - async function withTempStateDir(fn: (stateDir: string) => Promise): Promise { - return await withStateDirEnv( - "openclaw-typing-heartbeat-", - async ({ stateDir }) => await fn(stateDir), - ); - } - - async function writeCorruptGeminiSessionFixture(params: { - stateDir: string; - sessionId: string; - persistStore: boolean; - }) { - const storePath = path.join(params.stateDir, "sessions", "sessions.json"); - const sessionEntry: SessionEntry = { sessionId: params.sessionId, updatedAt: Date.now() }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - if (params.persistStore) { - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - } - - const transcriptPath = sessions.resolveSessionTranscriptPath(params.sessionId); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "bad", "utf-8"); - - return { storePath, sessionEntry, sessionStore, transcriptPath }; - } - it("signals typing for normal runs", async () => { const onPartialReply = vi.fn(); state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { @@ -1065,189 +1033,6 @@ describe("runReplyAgent typing (heartbeat)", () => { } }); - it("retries after compaction failure by resetting the session", async () => { - await withTempStateDir(async (stateDir) => { - const sessionId = "session"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry: SessionEntry = { - sessionId, - updatedAt: Date.now(), - sessionFile: transcriptPath, - fallbackNoticeSelectedModel: "fireworks/accounts/fireworks/routers/kimi-k2p5-turbo", - fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5", - fallbackNoticeReason: "rate limit", - }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const payload = Array.isArray(res) ? res[0] : res; - expect(payload).toMatchObject({ - text: expect.stringContaining("Context limit exceeded during compaction"), - }); - if (!payload) { - throw new Error("expected payload"); - } - expect(payload.text?.toLowerCase()).toContain("reset"); - expect(sessionStore.main.sessionId).not.toBe(sessionId); - expect(sessionStore.main.fallbackNoticeSelectedModel).toBeUndefined(); - expect(sessionStore.main.fallbackNoticeActiveModel).toBeUndefined(); - expect(sessionStore.main.fallbackNoticeReason).toBeUndefined(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); - expect(persisted.main.fallbackNoticeSelectedModel).toBeUndefined(); - expect(persisted.main.fallbackNoticeActiveModel).toBeUndefined(); - expect(persisted.main.fallbackNoticeReason).toBeUndefined(); - }); - }); - - it("retries after context overflow payload by resetting the session", async () => { - await withTempStateDir(async (stateDir) => { - const sessionId = "session"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ - payloads: [{ text: "Context overflow: prompt too large", isError: true }], - meta: { - durationMs: 1, - error: { - kind: "context_overflow", - message: 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', - }, - }, - })); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const payload = Array.isArray(res) ? res[0] : res; - expect(payload).toMatchObject({ - text: expect.stringContaining("Context limit exceeded"), - }); - if (!payload) { - throw new Error("expected payload"); - } - expect(payload.text?.toLowerCase()).toContain("reset"); - expect(sessionStore.main.sessionId).not.toBe(sessionId); - expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({ - key: "main", - previousSessionId: sessionId, - nextSessionId: sessionStore.main.sessionId, - nextSessionFile: expect.stringContaining(".jsonl"), - }); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); - }); - }); - - it("clears stale runtime model fields when resetSession retries after compaction failure", async () => { - await withTempStateDir(async (stateDir) => { - const sessionId = "session-stale-model"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry: SessionEntry = { - sessionId, - updatedAt: Date.now(), - sessionFile: transcriptPath, - modelProvider: "qwencode", - model: "qwen3.5-plus-2026-02-15", - contextTokens: 123456, - systemPromptReport: { - source: "run", - generatedAt: Date.now(), - sessionId, - sessionKey: "main", - provider: "qwencode", - model: "qwen3.5-plus-2026-02-15", - workspaceDir: stateDir, - bootstrapMaxChars: 1000, - bootstrapTotalMaxChars: 2000, - systemPrompt: { - chars: 10, - projectContextChars: 5, - nonProjectContextChars: 5, - }, - injectedWorkspaceFiles: [], - skills: { - promptChars: 0, - entries: [], - }, - tools: { - listChars: 0, - schemaChars: 0, - entries: [], - }, - }, - }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - await run(); - - expect(sessionStore.main.modelProvider).toBeUndefined(); - expect(sessionStore.main.model).toBeUndefined(); - expect(sessionStore.main.contextTokens).toBeUndefined(); - expect(sessionStore.main.systemPromptReport).toBeUndefined(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.modelProvider).toBeUndefined(); - expect(persisted.main.model).toBeUndefined(); - expect(persisted.main.contextTokens).toBeUndefined(); - expect(persisted.main.systemPromptReport).toBeUndefined(); - }); - }); - it("surfaces overflow fallback when embedded run returns empty payloads", async () => { state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ payloads: [], @@ -1296,163 +1081,6 @@ describe("runReplyAgent typing (heartbeat)", () => { expect(payload.text).toContain("/new"); }); - it("resets the session after role ordering payloads", async () => { - await withTempStateDir(async (stateDir) => { - const sessionId = "session"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ - payloads: [{ text: "Message ordering conflict - please try again.", isError: true }], - meta: { - durationMs: 1, - error: { - kind: "role_ordering", - message: 'messages: roles must alternate between "user" and "assistant"', - }, - }, - })); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - const payload = Array.isArray(res) ? res[0] : res; - expect(payload).toMatchObject({ - text: expect.stringContaining("Message ordering conflict"), - }); - if (!payload) { - throw new Error("expected payload"); - } - expect(payload.text?.toLowerCase()).toContain("reset"); - expect(sessionStore.main.sessionId).not.toBe(sessionId); - await expect(fs.access(transcriptPath)).rejects.toBeDefined(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); - }); - }); - - it("resets corrupted Gemini sessions and deletes transcripts", async () => { - await withTempStateDir(async (stateDir) => { - const { storePath, sessionEntry, sessionStore, transcriptPath } = - await writeCorruptGeminiSessionFixture({ - stateDir, - sessionId: "session-corrupt", - persistStore: true, - }); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - "function call turn comes immediately after a user turn or after a function response turn", - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Session history was corrupted"), - }); - expect(sessionStore.main).toBeUndefined(); - await expect(fs.access(transcriptPath)).rejects.toThrow(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main).toBeUndefined(); - }); - }); - - it("keeps sessions intact on other errors", async () => { - await withTempStateDir(async (stateDir) => { - const sessionId = "session-ok"; - const storePath = path.join(stateDir, "sessions", "sessions.json"); - const sessionEntry = { sessionId, updatedAt: Date.now() }; - const sessionStore = { main: sessionEntry }; - - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); - - const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "ok", "utf-8"); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error("INVALID_ARGUMENT: some other failure"); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Something went wrong while processing your request"), - }); - expect(sessionStore.main).toBeDefined(); - await expect(fs.access(transcriptPath)).resolves.toBeUndefined(); - - const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(persisted.main).toBeDefined(); - }); - }); - - it("still replies even if session reset fails to persist", async () => { - await withTempStateDir(async (stateDir) => { - const saveSpy = vi - .spyOn(sessions, "saveSessionStore") - .mockRejectedValueOnce(new Error("boom")); - try { - const { storePath, sessionEntry, sessionStore, transcriptPath } = - await writeCorruptGeminiSessionFixture({ - stateDir, - sessionId: "session-corrupt", - persistStore: false, - }); - - state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { - throw new Error( - "function call turn comes immediately after a user turn or after a function response turn", - ); - }); - - const { run } = createMinimalRun({ - sessionEntry, - sessionStore, - sessionKey: "main", - storePath, - }); - const res = await run(); - - expect(res).toMatchObject({ - text: expect.stringContaining("Session history was corrupted"), - }); - expect(sessionStore.main).toBeUndefined(); - await expect(fs.access(transcriptPath)).rejects.toThrow(); - } finally { - saveSpy.mockRestore(); - } - }); - }); - it("returns friendly message for role ordering errors thrown as exceptions", async () => { state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { throw new Error("400 Incorrect role information"); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 5fccbb24e77..26f704f2b2d 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1,26 +1,15 @@ -import fs from "node:fs"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { resolveModelAuthMode } from "../../agents/model-auth.js"; import { isCliProvider } from "../../agents/model-selection.js"; import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; -import { - resolveAgentIdFromSessionKey, - resolveSessionFilePath, - resolveSessionFilePathOptions, - resolveSessionTranscriptPath, - type SessionEntry, - updateSessionStore, - updateSessionStoreEntry, -} from "../../config/sessions.js"; +import { type SessionEntry, updateSessionStoreEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; -import { generateSecureUuid } from "../../infra/secure-random.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js"; -import { defaultRuntime } from "../../runtime.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js"; import { @@ -47,6 +36,7 @@ import { hasSessionRelatedCronJobs, hasUnbackedReminderCommitment, } from "./agent-runner-reminder-guard.js"; +import { resetReplyRunSession } from "./agent-runner-session-reset.js"; import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-usage-line.js"; import { resolveQueuedReplyExecutionConfig } from "./agent-runner-utils.js"; import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js"; @@ -350,86 +340,28 @@ export async function runReplyAgent(params: { failureLabel, buildLogMessage, cleanupTranscripts, - }: SessionResetOptions): Promise => { - if (!sessionKey || !activeSessionStore || !storePath) { - return false; - } - const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry; - if (!prevEntry) { - return false; - } - const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined; - const nextSessionId = generateSecureUuid(); - const nextEntry: SessionEntry = { - ...prevEntry, - sessionId: nextSessionId, - updatedAt: Date.now(), - systemSent: false, - abortedLastRun: false, - modelProvider: undefined, - model: undefined, - inputTokens: undefined, - outputTokens: undefined, - totalTokens: undefined, - totalTokensFresh: false, - estimatedCostUsd: undefined, - cacheRead: undefined, - cacheWrite: undefined, - contextTokens: undefined, - systemPromptReport: undefined, - fallbackNoticeSelectedModel: undefined, - fallbackNoticeActiveModel: undefined, - fallbackNoticeReason: undefined, - }; - const agentId = resolveAgentIdFromSessionKey(sessionKey); - const nextSessionFile = resolveSessionTranscriptPath( - nextSessionId, - agentId, - sessionCtx.MessageThreadId, - ); - nextEntry.sessionFile = nextSessionFile; - activeSessionStore[sessionKey] = nextEntry; - try { - await updateSessionStore(storePath, (store) => { - store[sessionKey] = nextEntry; - }); - } catch (err) { - defaultRuntime.error( - `Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`, - ); - } - followupRun.run.sessionId = nextSessionId; - followupRun.run.sessionFile = nextSessionFile; - refreshQueuedFollowupSession({ - key: queueKey, - previousSessionId: prevEntry.sessionId, - nextSessionId, - nextSessionFile, + }: SessionResetOptions): Promise => + await resetReplyRunSession({ + options: { + failureLabel, + buildLogMessage, + cleanupTranscripts, + }, + sessionKey, + queueKey, + activeSessionEntry, + activeSessionStore, + storePath, + messageThreadId: + typeof sessionCtx.MessageThreadId === "string" ? sessionCtx.MessageThreadId : undefined, + followupRun, + onActiveSessionEntry: (nextEntry) => { + activeSessionEntry = nextEntry; + }, + onNewSession: () => { + activeIsNewSession = true; + }, }); - activeSessionEntry = nextEntry; - activeIsNewSession = true; - defaultRuntime.error(buildLogMessage(nextSessionId)); - if (cleanupTranscripts && prevSessionId) { - const transcriptCandidates = new Set(); - const resolved = resolveSessionFilePath( - prevSessionId, - prevEntry, - resolveSessionFilePathOptions({ agentId, storePath }), - ); - if (resolved) { - transcriptCandidates.add(resolved); - } - transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId)); - for (const candidate of transcriptCandidates) { - try { - fs.unlinkSync(candidate); - } catch { - // Best-effort cleanup. - } - } - } - return true; - }; const resetSessionAfterCompactionFailure = async (reason: string): Promise => resetSession({ failureLabel: "compaction failure",