mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-13 10:11:20 +00:00
test: move runReplyAgent reset state coverage to helper
This commit is contained in:
170
src/auto-reply/reply/agent-runner-session-reset.test.ts
Normal file
170
src/auto-reply/reply/agent-runner-session-reset.test.ts
Normal file
@@ -0,0 +1,170 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import {
|
||||
resetReplyRunSession,
|
||||
setAgentRunnerSessionResetTestDeps,
|
||||
} from "./agent-runner-session-reset.js";
|
||||
import type { FollowupRun } from "./queue.js";
|
||||
|
||||
const refreshQueuedFollowupSessionMock = vi.fn();
|
||||
const errorMock = vi.fn();
|
||||
|
||||
function createFollowupRun(): FollowupRun {
|
||||
return {
|
||||
prompt: "hello",
|
||||
summaryLine: "hello",
|
||||
enqueuedAt: Date.now(),
|
||||
run: {
|
||||
sessionId: "session",
|
||||
sessionKey: "main",
|
||||
messageProvider: "whatsapp",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/tmp",
|
||||
config: {},
|
||||
skillsSnapshot: {},
|
||||
provider: "anthropic",
|
||||
model: "claude",
|
||||
thinkLevel: "low",
|
||||
verboseLevel: "off",
|
||||
elevatedLevel: "off",
|
||||
bashElevated: { enabled: false, allowed: false, defaultLevel: "off" },
|
||||
timeoutMs: 1_000,
|
||||
blockReplyBreak: "message_end",
|
||||
},
|
||||
} as unknown as FollowupRun;
|
||||
}
|
||||
|
||||
async function writeSessionStore(
|
||||
storePath: string,
|
||||
sessionKey: string,
|
||||
entry: SessionEntry,
|
||||
): Promise<void> {
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: entry }, null, 2), "utf8");
|
||||
}
|
||||
|
||||
describe("resetReplyRunSession", () => {
|
||||
let rootDir = "";
|
||||
|
||||
beforeEach(async () => {
|
||||
rootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-reset-run-"));
|
||||
refreshQueuedFollowupSessionMock.mockReset();
|
||||
errorMock.mockReset();
|
||||
setAgentRunnerSessionResetTestDeps({
|
||||
generateSecureUuid: () => "00000000-0000-0000-0000-000000000123",
|
||||
refreshQueuedFollowupSession: refreshQueuedFollowupSessionMock as never,
|
||||
error: errorMock,
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
setAgentRunnerSessionResetTestDeps();
|
||||
await fs.rm(rootDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("rotates the session and clears stale runtime and fallback fields", async () => {
|
||||
const storePath = path.join(rootDir, "sessions.json");
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: 1,
|
||||
sessionFile: path.join(rootDir, "session.jsonl"),
|
||||
modelProvider: "qwencode",
|
||||
model: "qwen",
|
||||
contextTokens: 123,
|
||||
fallbackNoticeSelectedModel: "anthropic/claude",
|
||||
fallbackNoticeActiveModel: "openai/gpt",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
systemPromptReport: {
|
||||
source: "run",
|
||||
generatedAt: 1,
|
||||
systemPrompt: { chars: 1, projectContextChars: 0, nonProjectContextChars: 1 },
|
||||
injectedWorkspaceFiles: [],
|
||||
skills: { promptChars: 0, entries: [] },
|
||||
tools: { listChars: 0, schemaChars: 0, entries: [] },
|
||||
},
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
const followupRun = createFollowupRun();
|
||||
await writeSessionStore(storePath, "main", sessionEntry);
|
||||
|
||||
let activeSessionEntry: SessionEntry | undefined = sessionEntry;
|
||||
let isNewSession = false;
|
||||
const reset = await resetReplyRunSession({
|
||||
options: {
|
||||
failureLabel: "compaction failure",
|
||||
buildLogMessage: (next) => `reset ${next}`,
|
||||
},
|
||||
sessionKey: "main",
|
||||
queueKey: "main",
|
||||
activeSessionEntry,
|
||||
activeSessionStore: sessionStore,
|
||||
storePath,
|
||||
followupRun,
|
||||
onActiveSessionEntry: (entry) => {
|
||||
activeSessionEntry = entry;
|
||||
},
|
||||
onNewSession: () => {
|
||||
isNewSession = true;
|
||||
},
|
||||
});
|
||||
|
||||
expect(reset).toBe(true);
|
||||
expect(isNewSession).toBe(true);
|
||||
expect(activeSessionEntry?.sessionId).toBe("00000000-0000-0000-0000-000000000123");
|
||||
expect(followupRun.run.sessionId).toBe(activeSessionEntry?.sessionId);
|
||||
expect(activeSessionEntry?.modelProvider).toBeUndefined();
|
||||
expect(activeSessionEntry?.model).toBeUndefined();
|
||||
expect(activeSessionEntry?.contextTokens).toBeUndefined();
|
||||
expect(activeSessionEntry?.fallbackNoticeSelectedModel).toBeUndefined();
|
||||
expect(activeSessionEntry?.fallbackNoticeActiveModel).toBeUndefined();
|
||||
expect(activeSessionEntry?.fallbackNoticeReason).toBeUndefined();
|
||||
expect(activeSessionEntry?.systemPromptReport).toBeUndefined();
|
||||
expect(refreshQueuedFollowupSessionMock).toHaveBeenCalledWith({
|
||||
key: "main",
|
||||
previousSessionId: "session",
|
||||
nextSessionId: activeSessionEntry?.sessionId,
|
||||
nextSessionFile: activeSessionEntry?.sessionFile,
|
||||
});
|
||||
expect(errorMock).toHaveBeenCalledWith("reset 00000000-0000-0000-0000-000000000123");
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
main: SessionEntry;
|
||||
};
|
||||
expect(persisted.main.sessionId).toBe(activeSessionEntry?.sessionId);
|
||||
expect(persisted.main.fallbackNoticeReason).toBeUndefined();
|
||||
});
|
||||
|
||||
it("cleans up the old transcript when requested", async () => {
|
||||
const storePath = path.join(rootDir, "sessions.json");
|
||||
const oldTranscriptPath = path.join(rootDir, "old-session.jsonl");
|
||||
await fs.writeFile(oldTranscriptPath, "old", "utf8");
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "old-session",
|
||||
updatedAt: 1,
|
||||
sessionFile: oldTranscriptPath,
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
await writeSessionStore(storePath, "main", sessionEntry);
|
||||
|
||||
await resetReplyRunSession({
|
||||
options: {
|
||||
failureLabel: "role ordering conflict",
|
||||
cleanupTranscripts: true,
|
||||
buildLogMessage: (next) => `reset ${next}`,
|
||||
},
|
||||
sessionKey: "main",
|
||||
queueKey: "main",
|
||||
activeSessionEntry: sessionEntry,
|
||||
activeSessionStore: sessionStore,
|
||||
storePath,
|
||||
followupRun: createFollowupRun(),
|
||||
onActiveSessionEntry: () => {},
|
||||
onNewSession: () => {},
|
||||
});
|
||||
|
||||
await expect(fs.access(oldTranscriptPath)).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
127
src/auto-reply/reply/agent-runner-session-reset.ts
Normal file
127
src/auto-reply/reply/agent-runner-session-reset.ts
Normal file
@@ -0,0 +1,127 @@
|
||||
import fs from "node:fs";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import {
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveSessionFilePath,
|
||||
resolveSessionFilePathOptions,
|
||||
resolveSessionTranscriptPath,
|
||||
updateSessionStore,
|
||||
} from "../../config/sessions.js";
|
||||
import { generateSecureUuid } from "../../infra/secure-random.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
|
||||
|
||||
type ResetSessionOptions = {
|
||||
failureLabel: string;
|
||||
buildLogMessage: (nextSessionId: string) => string;
|
||||
cleanupTranscripts?: boolean;
|
||||
};
|
||||
|
||||
const deps = {
|
||||
generateSecureUuid,
|
||||
updateSessionStore,
|
||||
refreshQueuedFollowupSession,
|
||||
error: (message: string) => defaultRuntime.error(message),
|
||||
};
|
||||
|
||||
export function setAgentRunnerSessionResetTestDeps(overrides?: Partial<typeof deps>): void {
|
||||
Object.assign(deps, {
|
||||
generateSecureUuid,
|
||||
updateSessionStore,
|
||||
refreshQueuedFollowupSession,
|
||||
error: (message: string) => defaultRuntime.error(message),
|
||||
...overrides,
|
||||
});
|
||||
}
|
||||
|
||||
export async function resetReplyRunSession(params: {
|
||||
options: ResetSessionOptions;
|
||||
sessionKey?: string;
|
||||
queueKey: string;
|
||||
activeSessionEntry?: SessionEntry;
|
||||
activeSessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
messageThreadId?: string;
|
||||
followupRun: FollowupRun;
|
||||
onActiveSessionEntry: (entry: SessionEntry) => void;
|
||||
onNewSession: (newSessionId: string, nextSessionFile: string) => void;
|
||||
}): Promise<boolean> {
|
||||
if (!params.sessionKey || !params.activeSessionStore || !params.storePath) {
|
||||
return false;
|
||||
}
|
||||
const prevEntry = params.activeSessionStore[params.sessionKey] ?? params.activeSessionEntry;
|
||||
if (!prevEntry) {
|
||||
return false;
|
||||
}
|
||||
const prevSessionId = params.options.cleanupTranscripts ? prevEntry.sessionId : undefined;
|
||||
const nextSessionId = deps.generateSecureUuid();
|
||||
const nextEntry: SessionEntry = {
|
||||
...prevEntry,
|
||||
sessionId: nextSessionId,
|
||||
updatedAt: Date.now(),
|
||||
systemSent: false,
|
||||
abortedLastRun: false,
|
||||
modelProvider: undefined,
|
||||
model: undefined,
|
||||
inputTokens: undefined,
|
||||
outputTokens: undefined,
|
||||
totalTokens: undefined,
|
||||
totalTokensFresh: false,
|
||||
estimatedCostUsd: undefined,
|
||||
cacheRead: undefined,
|
||||
cacheWrite: undefined,
|
||||
contextTokens: undefined,
|
||||
systemPromptReport: undefined,
|
||||
fallbackNoticeSelectedModel: undefined,
|
||||
fallbackNoticeActiveModel: undefined,
|
||||
fallbackNoticeReason: undefined,
|
||||
};
|
||||
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
|
||||
const nextSessionFile = resolveSessionTranscriptPath(
|
||||
nextSessionId,
|
||||
agentId,
|
||||
params.messageThreadId,
|
||||
);
|
||||
nextEntry.sessionFile = nextSessionFile;
|
||||
params.activeSessionStore[params.sessionKey] = nextEntry;
|
||||
try {
|
||||
await deps.updateSessionStore(params.storePath, (store) => {
|
||||
store[params.sessionKey!] = nextEntry;
|
||||
});
|
||||
} catch (err) {
|
||||
deps.error(
|
||||
`Failed to persist session reset after ${params.options.failureLabel} (${params.sessionKey}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
params.followupRun.run.sessionId = nextSessionId;
|
||||
params.followupRun.run.sessionFile = nextSessionFile;
|
||||
deps.refreshQueuedFollowupSession({
|
||||
key: params.queueKey,
|
||||
previousSessionId: prevEntry.sessionId,
|
||||
nextSessionId,
|
||||
nextSessionFile,
|
||||
});
|
||||
params.onActiveSessionEntry(nextEntry);
|
||||
params.onNewSession(nextSessionId, nextSessionFile);
|
||||
deps.error(params.options.buildLogMessage(nextSessionId));
|
||||
if (params.options.cleanupTranscripts && prevSessionId) {
|
||||
const transcriptCandidates = new Set<string>();
|
||||
const resolved = resolveSessionFilePath(
|
||||
prevSessionId,
|
||||
prevEntry,
|
||||
resolveSessionFilePathOptions({ agentId, storePath: params.storePath }),
|
||||
);
|
||||
if (resolved) {
|
||||
transcriptCandidates.add(resolved);
|
||||
}
|
||||
transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));
|
||||
for (const candidate of transcriptCandidates) {
|
||||
try {
|
||||
fs.unlinkSync(candidate);
|
||||
} catch {
|
||||
// Best-effort cleanup.
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -1,10 +1,6 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import * as sessions from "../../config/sessions.js";
|
||||
import type { TypingMode } from "../../config/types.js";
|
||||
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
|
||||
import type { TemplateContext } from "../templating.js";
|
||||
import type { GetReplyOptions } from "../types.js";
|
||||
import {
|
||||
@@ -261,34 +257,6 @@ describe("runReplyAgent heartbeat followup guard", () => {
|
||||
});
|
||||
|
||||
describe("runReplyAgent typing (heartbeat)", () => {
|
||||
async function withTempStateDir<T>(fn: (stateDir: string) => Promise<T>): Promise<T> {
|
||||
return await withStateDirEnv(
|
||||
"openclaw-typing-heartbeat-",
|
||||
async ({ stateDir }) => await fn(stateDir),
|
||||
);
|
||||
}
|
||||
|
||||
async function writeCorruptGeminiSessionFixture(params: {
|
||||
stateDir: string;
|
||||
sessionId: string;
|
||||
persistStore: boolean;
|
||||
}) {
|
||||
const storePath = path.join(params.stateDir, "sessions", "sessions.json");
|
||||
const sessionEntry: SessionEntry = { sessionId: params.sessionId, updatedAt: Date.now() };
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
if (params.persistStore) {
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
|
||||
}
|
||||
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(params.sessionId);
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "bad", "utf-8");
|
||||
|
||||
return { storePath, sessionEntry, sessionStore, transcriptPath };
|
||||
}
|
||||
|
||||
it("signals typing for normal runs", async () => {
|
||||
const onPartialReply = vi.fn();
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
|
||||
@@ -1065,189 +1033,6 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("retries after compaction failure by resetting the session", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const sessionId = "session";
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId,
|
||||
updatedAt: Date.now(),
|
||||
sessionFile: transcriptPath,
|
||||
fallbackNoticeSelectedModel: "fireworks/accounts/fireworks/routers/kimi-k2p5-turbo",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "ok", "utf-8");
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
|
||||
throw new Error(
|
||||
'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
|
||||
);
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
const res = await run();
|
||||
|
||||
expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
const payload = Array.isArray(res) ? res[0] : res;
|
||||
expect(payload).toMatchObject({
|
||||
text: expect.stringContaining("Context limit exceeded during compaction"),
|
||||
});
|
||||
if (!payload) {
|
||||
throw new Error("expected payload");
|
||||
}
|
||||
expect(payload.text?.toLowerCase()).toContain("reset");
|
||||
expect(sessionStore.main.sessionId).not.toBe(sessionId);
|
||||
expect(sessionStore.main.fallbackNoticeSelectedModel).toBeUndefined();
|
||||
expect(sessionStore.main.fallbackNoticeActiveModel).toBeUndefined();
|
||||
expect(sessionStore.main.fallbackNoticeReason).toBeUndefined();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
|
||||
expect(persisted.main.fallbackNoticeSelectedModel).toBeUndefined();
|
||||
expect(persisted.main.fallbackNoticeActiveModel).toBeUndefined();
|
||||
expect(persisted.main.fallbackNoticeReason).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("retries after context overflow payload by resetting the session", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const sessionId = "session";
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
|
||||
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "ok", "utf-8");
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
|
||||
payloads: [{ text: "Context overflow: prompt too large", isError: true }],
|
||||
meta: {
|
||||
durationMs: 1,
|
||||
error: {
|
||||
kind: "context_overflow",
|
||||
message: 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
const res = await run();
|
||||
|
||||
expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
const payload = Array.isArray(res) ? res[0] : res;
|
||||
expect(payload).toMatchObject({
|
||||
text: expect.stringContaining("Context limit exceeded"),
|
||||
});
|
||||
if (!payload) {
|
||||
throw new Error("expected payload");
|
||||
}
|
||||
expect(payload.text?.toLowerCase()).toContain("reset");
|
||||
expect(sessionStore.main.sessionId).not.toBe(sessionId);
|
||||
expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({
|
||||
key: "main",
|
||||
previousSessionId: sessionId,
|
||||
nextSessionId: sessionStore.main.sessionId,
|
||||
nextSessionFile: expect.stringContaining(".jsonl"),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
|
||||
});
|
||||
});
|
||||
|
||||
it("clears stale runtime model fields when resetSession retries after compaction failure", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const sessionId = "session-stale-model";
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId,
|
||||
updatedAt: Date.now(),
|
||||
sessionFile: transcriptPath,
|
||||
modelProvider: "qwencode",
|
||||
model: "qwen3.5-plus-2026-02-15",
|
||||
contextTokens: 123456,
|
||||
systemPromptReport: {
|
||||
source: "run",
|
||||
generatedAt: Date.now(),
|
||||
sessionId,
|
||||
sessionKey: "main",
|
||||
provider: "qwencode",
|
||||
model: "qwen3.5-plus-2026-02-15",
|
||||
workspaceDir: stateDir,
|
||||
bootstrapMaxChars: 1000,
|
||||
bootstrapTotalMaxChars: 2000,
|
||||
systemPrompt: {
|
||||
chars: 10,
|
||||
projectContextChars: 5,
|
||||
nonProjectContextChars: 5,
|
||||
},
|
||||
injectedWorkspaceFiles: [],
|
||||
skills: {
|
||||
promptChars: 0,
|
||||
entries: [],
|
||||
},
|
||||
tools: {
|
||||
listChars: 0,
|
||||
schemaChars: 0,
|
||||
entries: [],
|
||||
},
|
||||
},
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "ok", "utf-8");
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
|
||||
throw new Error(
|
||||
'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}',
|
||||
);
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
await run();
|
||||
|
||||
expect(sessionStore.main.modelProvider).toBeUndefined();
|
||||
expect(sessionStore.main.model).toBeUndefined();
|
||||
expect(sessionStore.main.contextTokens).toBeUndefined();
|
||||
expect(sessionStore.main.systemPromptReport).toBeUndefined();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main.modelProvider).toBeUndefined();
|
||||
expect(persisted.main.model).toBeUndefined();
|
||||
expect(persisted.main.contextTokens).toBeUndefined();
|
||||
expect(persisted.main.systemPromptReport).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("surfaces overflow fallback when embedded run returns empty payloads", async () => {
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
|
||||
payloads: [],
|
||||
@@ -1296,163 +1081,6 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
expect(payload.text).toContain("/new");
|
||||
});
|
||||
|
||||
it("resets the session after role ordering payloads", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const sessionId = "session";
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
|
||||
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "ok", "utf-8");
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
|
||||
payloads: [{ text: "Message ordering conflict - please try again.", isError: true }],
|
||||
meta: {
|
||||
durationMs: 1,
|
||||
error: {
|
||||
kind: "role_ordering",
|
||||
message: 'messages: roles must alternate between "user" and "assistant"',
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
const res = await run();
|
||||
|
||||
const payload = Array.isArray(res) ? res[0] : res;
|
||||
expect(payload).toMatchObject({
|
||||
text: expect.stringContaining("Message ordering conflict"),
|
||||
});
|
||||
if (!payload) {
|
||||
throw new Error("expected payload");
|
||||
}
|
||||
expect(payload.text?.toLowerCase()).toContain("reset");
|
||||
expect(sessionStore.main.sessionId).not.toBe(sessionId);
|
||||
await expect(fs.access(transcriptPath)).rejects.toBeDefined();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
|
||||
});
|
||||
});
|
||||
|
||||
it("resets corrupted Gemini sessions and deletes transcripts", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const { storePath, sessionEntry, sessionStore, transcriptPath } =
|
||||
await writeCorruptGeminiSessionFixture({
|
||||
stateDir,
|
||||
sessionId: "session-corrupt",
|
||||
persistStore: true,
|
||||
});
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
|
||||
throw new Error(
|
||||
"function call turn comes immediately after a user turn or after a function response turn",
|
||||
);
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
const res = await run();
|
||||
|
||||
expect(res).toMatchObject({
|
||||
text: expect.stringContaining("Session history was corrupted"),
|
||||
});
|
||||
expect(sessionStore.main).toBeUndefined();
|
||||
await expect(fs.access(transcriptPath)).rejects.toThrow();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps sessions intact on other errors", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const sessionId = "session-ok";
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
const sessionEntry = { sessionId, updatedAt: Date.now() };
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
|
||||
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "ok", "utf-8");
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
|
||||
throw new Error("INVALID_ARGUMENT: some other failure");
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
const res = await run();
|
||||
|
||||
expect(res).toMatchObject({
|
||||
text: expect.stringContaining("Something went wrong while processing your request"),
|
||||
});
|
||||
expect(sessionStore.main).toBeDefined();
|
||||
await expect(fs.access(transcriptPath)).resolves.toBeUndefined();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("still replies even if session reset fails to persist", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const saveSpy = vi
|
||||
.spyOn(sessions, "saveSessionStore")
|
||||
.mockRejectedValueOnce(new Error("boom"));
|
||||
try {
|
||||
const { storePath, sessionEntry, sessionStore, transcriptPath } =
|
||||
await writeCorruptGeminiSessionFixture({
|
||||
stateDir,
|
||||
sessionId: "session-corrupt",
|
||||
persistStore: false,
|
||||
});
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
|
||||
throw new Error(
|
||||
"function call turn comes immediately after a user turn or after a function response turn",
|
||||
);
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
storePath,
|
||||
});
|
||||
const res = await run();
|
||||
|
||||
expect(res).toMatchObject({
|
||||
text: expect.stringContaining("Session history was corrupted"),
|
||||
});
|
||||
expect(sessionStore.main).toBeUndefined();
|
||||
await expect(fs.access(transcriptPath)).rejects.toThrow();
|
||||
} finally {
|
||||
saveSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("returns friendly message for role ordering errors thrown as exceptions", async () => {
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
|
||||
throw new Error("400 Incorrect role information");
|
||||
|
||||
@@ -1,26 +1,15 @@
|
||||
import fs from "node:fs";
|
||||
import { lookupContextTokens } from "../../agents/context.js";
|
||||
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
|
||||
import { resolveModelAuthMode } from "../../agents/model-auth.js";
|
||||
import { isCliProvider } from "../../agents/model-selection.js";
|
||||
import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js";
|
||||
import { hasNonzeroUsage } from "../../agents/usage.js";
|
||||
import {
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveSessionFilePath,
|
||||
resolveSessionFilePathOptions,
|
||||
resolveSessionTranscriptPath,
|
||||
type SessionEntry,
|
||||
updateSessionStore,
|
||||
updateSessionStoreEntry,
|
||||
} from "../../config/sessions.js";
|
||||
import { type SessionEntry, updateSessionStoreEntry } from "../../config/sessions.js";
|
||||
import type { TypingMode } from "../../config/types.js";
|
||||
import { emitAgentEvent } from "../../infra/agent-events.js";
|
||||
import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
|
||||
import { generateSecureUuid } from "../../infra/secure-random.js";
|
||||
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
||||
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js";
|
||||
import {
|
||||
@@ -47,6 +36,7 @@ import {
|
||||
hasSessionRelatedCronJobs,
|
||||
hasUnbackedReminderCommitment,
|
||||
} from "./agent-runner-reminder-guard.js";
|
||||
import { resetReplyRunSession } from "./agent-runner-session-reset.js";
|
||||
import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-usage-line.js";
|
||||
import { resolveQueuedReplyExecutionConfig } from "./agent-runner-utils.js";
|
||||
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
|
||||
@@ -350,86 +340,28 @@ export async function runReplyAgent(params: {
|
||||
failureLabel,
|
||||
buildLogMessage,
|
||||
cleanupTranscripts,
|
||||
}: SessionResetOptions): Promise<boolean> => {
|
||||
if (!sessionKey || !activeSessionStore || !storePath) {
|
||||
return false;
|
||||
}
|
||||
const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry;
|
||||
if (!prevEntry) {
|
||||
return false;
|
||||
}
|
||||
const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined;
|
||||
const nextSessionId = generateSecureUuid();
|
||||
const nextEntry: SessionEntry = {
|
||||
...prevEntry,
|
||||
sessionId: nextSessionId,
|
||||
updatedAt: Date.now(),
|
||||
systemSent: false,
|
||||
abortedLastRun: false,
|
||||
modelProvider: undefined,
|
||||
model: undefined,
|
||||
inputTokens: undefined,
|
||||
outputTokens: undefined,
|
||||
totalTokens: undefined,
|
||||
totalTokensFresh: false,
|
||||
estimatedCostUsd: undefined,
|
||||
cacheRead: undefined,
|
||||
cacheWrite: undefined,
|
||||
contextTokens: undefined,
|
||||
systemPromptReport: undefined,
|
||||
fallbackNoticeSelectedModel: undefined,
|
||||
fallbackNoticeActiveModel: undefined,
|
||||
fallbackNoticeReason: undefined,
|
||||
};
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const nextSessionFile = resolveSessionTranscriptPath(
|
||||
nextSessionId,
|
||||
agentId,
|
||||
sessionCtx.MessageThreadId,
|
||||
);
|
||||
nextEntry.sessionFile = nextSessionFile;
|
||||
activeSessionStore[sessionKey] = nextEntry;
|
||||
try {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
store[sessionKey] = nextEntry;
|
||||
});
|
||||
} catch (err) {
|
||||
defaultRuntime.error(
|
||||
`Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
followupRun.run.sessionId = nextSessionId;
|
||||
followupRun.run.sessionFile = nextSessionFile;
|
||||
refreshQueuedFollowupSession({
|
||||
key: queueKey,
|
||||
previousSessionId: prevEntry.sessionId,
|
||||
nextSessionId,
|
||||
nextSessionFile,
|
||||
}: SessionResetOptions): Promise<boolean> =>
|
||||
await resetReplyRunSession({
|
||||
options: {
|
||||
failureLabel,
|
||||
buildLogMessage,
|
||||
cleanupTranscripts,
|
||||
},
|
||||
sessionKey,
|
||||
queueKey,
|
||||
activeSessionEntry,
|
||||
activeSessionStore,
|
||||
storePath,
|
||||
messageThreadId:
|
||||
typeof sessionCtx.MessageThreadId === "string" ? sessionCtx.MessageThreadId : undefined,
|
||||
followupRun,
|
||||
onActiveSessionEntry: (nextEntry) => {
|
||||
activeSessionEntry = nextEntry;
|
||||
},
|
||||
onNewSession: () => {
|
||||
activeIsNewSession = true;
|
||||
},
|
||||
});
|
||||
activeSessionEntry = nextEntry;
|
||||
activeIsNewSession = true;
|
||||
defaultRuntime.error(buildLogMessage(nextSessionId));
|
||||
if (cleanupTranscripts && prevSessionId) {
|
||||
const transcriptCandidates = new Set<string>();
|
||||
const resolved = resolveSessionFilePath(
|
||||
prevSessionId,
|
||||
prevEntry,
|
||||
resolveSessionFilePathOptions({ agentId, storePath }),
|
||||
);
|
||||
if (resolved) {
|
||||
transcriptCandidates.add(resolved);
|
||||
}
|
||||
transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));
|
||||
for (const candidate of transcriptCandidates) {
|
||||
try {
|
||||
fs.unlinkSync(candidate);
|
||||
} catch {
|
||||
// Best-effort cleanup.
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
const resetSessionAfterCompactionFailure = async (reason: string): Promise<boolean> =>
|
||||
resetSession({
|
||||
failureLabel: "compaction failure",
|
||||
|
||||
Reference in New Issue
Block a user