diff --git a/src/auto-reply/reply/commands-core.ts b/src/auto-reply/reply/commands-core.ts index 456aa4da52a..94ca6ed102b 100644 --- a/src/auto-reply/reply/commands-core.ts +++ b/src/auto-reply/reply/commands-core.ts @@ -55,6 +55,7 @@ export async function emitResetCommandHooks(params: { sessionKey?: string; sessionEntry?: HandleCommandsParams["sessionEntry"]; previousSessionEntry?: HandleCommandsParams["previousSessionEntry"]; + storePath?: HandleCommandsParams["storePath"]; workspaceDir: string; }): Promise { const hookEvent = createInternalHookEvent("command", params.action, params.sessionKey ?? "", { @@ -95,6 +96,7 @@ export async function emitResetCommandHooks(params: { previousSessionEntry: params.previousSessionEntry, workspaceDir: params.workspaceDir, reason: params.action, + storePath: params.storePath, }); } @@ -211,6 +213,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise ({ + hasHooks: vi.fn(), + runBeforeReset: vi.fn(), +})); + +vi.mock("../../plugins/hook-runner-global.js", () => ({ + getGlobalHookRunner: () => + ({ + hasHooks: hookRunnerMocks.hasHooks, + runBeforeReset: hookRunnerMocks.runBeforeReset, + }) as unknown as HookRunner, +})); + +const { emitBeforeResetPluginHook } = await import("./reset-hooks.js"); + +describe("emitBeforeResetPluginHook", () => { + let tempDir: string; + let storePath: string; + + beforeEach(async () => { + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-before-reset-")); + storePath = path.join(tempDir, "sessions.json"); + hookRunnerMocks.hasHooks.mockReset(); + hookRunnerMocks.runBeforeReset.mockReset(); + hookRunnerMocks.hasHooks.mockImplementation((hookName) => hookName === "before_reset"); + hookRunnerMocks.runBeforeReset.mockResolvedValue(undefined); + }); + + afterEach(async () => { + await fs.rm(tempDir, { recursive: true, force: true }); + vi.restoreAllMocks(); + }); + + it("re-resolves transcript paths within the session store directory", async () => { + const transcriptPath = path.join(tempDir, "sess-main.jsonl"); + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ type: "message", message: { role: "user", content: "hello" } })}\n`, + "utf-8", + ); + const resolvedTranscriptPath = await fs.realpath(transcriptPath).catch(() => transcriptPath); + + await emitBeforeResetPluginHook({ + sessionKey: "agent:main:main", + previousSessionEntry: { + sessionId: "sess-main", + sessionFile: "../../etc/passwd", + }, + workspaceDir: "/tmp/openclaw-workspace", + reason: "new", + storePath, + }); + + expect(hookRunnerMocks.runBeforeReset).toHaveBeenCalledWith( + expect.objectContaining({ + sessionFile: resolvedTranscriptPath, + messages: [{ role: "user", content: "hello" }], + reason: "new", + }), + expect.objectContaining({ + agentId: "main", + sessionKey: "agent:main:main", + sessionId: "sess-main", + }), + ); + }); + + it("caps extracted transcript messages to a bounded maximum", async () => { + const transcriptPath = path.join(tempDir, "sess-cap.jsonl"); + const lines = Array.from({ length: 1_050 }, (_, index) => + JSON.stringify({ type: "message", message: { role: "user", content: `m-${index}` } }), + ).join("\n"); + await fs.writeFile(transcriptPath, `${lines}\n`, "utf-8"); + + await emitBeforeResetPluginHook({ + sessionKey: "agent:main:main", + previousSessionEntry: { + sessionId: "sess-cap", + sessionFile: "sess-cap.jsonl", + }, + workspaceDir: "/tmp/openclaw-workspace", + reason: "reset", + storePath, + }); + + const [event] = hookRunnerMocks.runBeforeReset.mock.calls[0] ?? []; + const messages = event?.messages; + expect(Array.isArray(messages)).toBe(true); + expect(messages).toHaveLength(1_000); + expect(messages?.[0]).toEqual({ role: "user", content: "m-0" }); + expect(messages?.at(-1)).toEqual({ role: "user", content: "m-999" }); + }); +}); diff --git a/src/auto-reply/reply/reset-hooks.ts b/src/auto-reply/reply/reset-hooks.ts index 3e43851eb3a..2ebe946135e 100644 --- a/src/auto-reply/reply/reset-hooks.ts +++ b/src/auto-reply/reply/reset-hooks.ts @@ -1,4 +1,7 @@ +import { createReadStream } from "node:fs"; import fs from "node:fs/promises"; +import readline from "node:readline"; +import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; @@ -8,11 +11,68 @@ type BeforeResetSessionEntry = { sessionFile?: string; } | null; +const MAX_BEFORE_RESET_TRANSCRIPT_BYTES = 2 * 1024 * 1024; +const MAX_BEFORE_RESET_TRANSCRIPT_LINES = 10_000; +const MAX_BEFORE_RESET_MESSAGES = 1_000; + +async function readBoundedBeforeResetMessages(sessionFile: string): Promise { + const stat = await fs.stat(sessionFile); + if (stat.size > MAX_BEFORE_RESET_TRANSCRIPT_BYTES) { + logVerbose( + `before_reset: transcript exceeds ${MAX_BEFORE_RESET_TRANSCRIPT_BYTES} bytes; skipping message extraction`, + ); + return []; + } + + const messages: unknown[] = []; + let lineCount = 0; + let bytesRead = 0; + let truncated = false; + const stream = createReadStream(sessionFile, { encoding: "utf-8" }); + const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); + + try { + for await (const line of rl) { + lineCount += 1; + bytesRead += Buffer.byteLength(line, "utf-8") + 1; + if ( + lineCount > MAX_BEFORE_RESET_TRANSCRIPT_LINES || + bytesRead > MAX_BEFORE_RESET_TRANSCRIPT_BYTES || + messages.length >= MAX_BEFORE_RESET_MESSAGES + ) { + truncated = true; + break; + } + if (!line.trim()) { + continue; + } + try { + const entry = JSON.parse(line); + if (entry.type === "message" && entry.message) { + messages.push(entry.message); + } + } catch { + // Skip malformed transcript lines. + } + } + } finally { + rl.close(); + stream.destroy(); + } + + if (truncated) { + logVerbose("before_reset: transcript parsing truncated to bounded limits"); + } + + return messages; +} + export async function emitBeforeResetPluginHook(params: { sessionKey?: string; previousSessionEntry?: BeforeResetSessionEntry; workspaceDir: string; reason: string; + storePath?: string; }): Promise { const hookRunner = getGlobalHookRunner(); if (!hookRunner?.hasHooks("before_reset")) { @@ -20,34 +80,34 @@ export async function emitBeforeResetPluginHook(params: { } const prevEntry = params.previousSessionEntry; - const sessionFile = prevEntry?.sessionFile; + const sessionId = prevEntry?.sessionId; + const agentId = resolveAgentIdFromSessionKey(params.sessionKey); + const pathOpts = resolveSessionFilePathOptions({ + agentId, + storePath: params.storePath, + }); + let sessionFile: string | undefined; try { - const messages: unknown[] = []; - if (sessionFile) { - const content = await fs.readFile(sessionFile, "utf-8"); - for (const line of content.split("\n")) { - if (!line.trim()) { - continue; - } - try { - const entry = JSON.parse(line); - if (entry.type === "message" && entry.message) { - messages.push(entry.message); - } - } catch { - // Skip malformed transcript lines. - } + let messages: unknown[] = []; + if (sessionId) { + sessionFile = resolveSessionFilePath(sessionId, prevEntry ?? undefined, pathOpts); + try { + messages = await readBoundedBeforeResetMessages(sessionFile); + } catch (err: unknown) { + logVerbose(`before_reset: failed reading transcript messages: ${String(err)}`); } + } else if (prevEntry?.sessionFile) { + logVerbose("before_reset: session file present without session id; skipping transcript read"); } else { logVerbose("before_reset: no session file available, firing hook with empty messages"); } await hookRunner.runBeforeReset( { sessionFile, messages, reason: params.reason }, { - agentId: resolveAgentIdFromSessionKey(params.sessionKey), + agentId, sessionKey: params.sessionKey, - sessionId: prevEntry?.sessionId, + sessionId, workspaceDir: params.workspaceDir, }, ); diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 72a29c429e4..87149b395d1 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -508,6 +508,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { previousSessionEntry: entry, workspaceDir: resolveAgentWorkspaceDir(cfg, target.agentId), reason: commandReason, + storePath, }); let oldSessionId: string | undefined; let oldSessionFile: string | undefined; diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 0e180b7d689..a97cc249add 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -1191,6 +1191,9 @@ describe("gateway server sessions", () => { test("sessions.reset runs before_reset plugin hooks with gateway session context", async () => { const { dir } = await createSessionStoreDir(); await writeSingleLineSession(dir, "sess-main", "hello"); + const resolvedTranscriptPath = await fs + .realpath(path.join(dir, "sess-main.jsonl")) + .catch(() => path.join(dir, "sess-main.jsonl")); await writeSessionStore({ entries: { @@ -1213,7 +1216,7 @@ describe("gateway server sessions", () => { await vi.waitFor(() => expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledTimes(1)); expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledWith( expect.objectContaining({ - sessionFile: path.join(dir, "sess-main.jsonl"), + sessionFile: resolvedTranscriptPath, reason: "new", }), expect.objectContaining({