From f7537faa215b47403613736bceee430052acc438 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 23 Apr 2026 11:33:27 -0700 Subject: [PATCH] fix(agents): emit cli runner llm lifecycle hooks (#70731) --- src/agents/cli-runner.reliability.test.ts | 187 ++++++++++++++++++++++ src/agents/cli-runner.ts | 159 +++++++++++++++++- 2 files changed, 343 insertions(+), 3 deletions(-) diff --git a/src/agents/cli-runner.reliability.test.ts b/src/agents/cli-runner.reliability.test.ts index eb495361513..1aa5c218705 100644 --- a/src/agents/cli-runner.reliability.test.ts +++ b/src/agents/cli-runner.reliability.test.ts @@ -1,9 +1,14 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; import { afterEach, describe, expect, it, vi } from "vitest"; import { __testing as replyRunTesting, createReplyOperation, replyRunRegistry, } from "../auto-reply/reply/reply-run-registry.js"; +import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import { runPreparedCliAgent } from "./cli-runner.js"; import { createManagedRun, @@ -15,6 +20,43 @@ import { executePreparedCliRun } from "./cli-runner/execute.js"; import { resolveCliNoOutputTimeoutMs } from "./cli-runner/helpers.js"; import type { PreparedCliRunContext } from "./cli-runner/types.js"; +vi.mock("../plugins/hook-runner-global.js", async () => { + const actual = await vi.importActual( + "../plugins/hook-runner-global.js", + ); + return { + ...actual, + getGlobalHookRunner: vi.fn(() => null), + }; +}); + +const mockGetGlobalHookRunner = vi.mocked(getGlobalHookRunner); + +function createSessionFile(params?: { history?: Array<{ role: "user"; content: string }> }) { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-cli-hooks-")); + const sessionFile = path.join(dir, "session.jsonl"); + fs.writeFileSync( + sessionFile, + `${JSON.stringify({ + type: "session", + version: CURRENT_SESSION_VERSION, + id: "session-test", + timestamp: new Date(0).toISOString(), + cwd: dir, + })}\n`, + "utf-8", + ); + const sessionManager = SessionManager.open(sessionFile); + for (const entry of params?.history ?? []) { + sessionManager.appendMessage({ + role: entry.role, + content: entry.content, + timestamp: Date.now(), + }); + } + return { dir, sessionFile }; +} + function buildPreparedContext(params?: { sessionKey?: string; cliSessionId?: string; @@ -67,6 +109,7 @@ function buildPreparedContext(params?: { describe("runCliAgent reliability", () => { afterEach(() => { replyRunTesting.resetReplyRunRegistry(); + mockGetGlobalHookRunner.mockReset(); }); it("fails with timeout when no-output watchdog trips", async () => { @@ -312,6 +355,150 @@ describe("runCliAgent reliability", () => { expect(result.meta.finalAssistantVisibleText).toBe("goodbye from cli"); expect(result.meta.finalAssistantRawText).toBe("hello from cli"); }); + + it("emits llm_input, llm_output, and agent_end hooks for successful CLI runs", async () => { + const hookRunner = { + hasHooks: vi.fn((hookName: string) => + ["llm_input", "llm_output", "agent_end"].includes(hookName), + ), + runLlmInput: vi.fn(async () => undefined), + runLlmOutput: vi.fn(async () => undefined), + runAgentEnd: vi.fn(async () => undefined), + }; + mockGetGlobalHookRunner.mockReturnValue(hookRunner as never); + const { dir, sessionFile } = createSessionFile(); + + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: "hello from cli", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }), + ); + + try { + await runPreparedCliAgent({ + ...buildPreparedContext(), + params: { + ...buildPreparedContext().params, + sessionFile, + workspaceDir: dir, + sessionKey: "agent:main:main", + agentId: "main", + messageProvider: "acp", + messageChannel: "telegram", + trigger: "user", + }, + }); + + await vi.waitFor(() => { + expect(hookRunner.runLlmInput).toHaveBeenCalledTimes(1); + expect(hookRunner.runLlmOutput).toHaveBeenCalledTimes(1); + expect(hookRunner.runAgentEnd).toHaveBeenCalledTimes(1); + }); + + expect(hookRunner.runLlmInput).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-2", + sessionId: "s1", + provider: "codex-cli", + model: "gpt-5.4", + prompt: "hi", + systemPrompt: "You are a helpful assistant.", + historyMessages: expect.any(Array), + imagesCount: 0, + }), + expect.objectContaining({ + runId: "run-2", + agentId: "main", + sessionKey: "agent:main:main", + sessionId: "s1", + workspaceDir: dir, + messageProvider: "acp", + trigger: "user", + channelId: "telegram", + }), + ); + expect(hookRunner.runLlmOutput).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-2", + sessionId: "s1", + provider: "codex-cli", + model: "gpt-5.4", + assistantTexts: ["hello from cli"], + lastAssistant: expect.objectContaining({ + role: "assistant", + content: [{ type: "text", text: "hello from cli" }], + provider: "codex-cli", + model: "gpt-5.4", + }), + }), + expect.any(Object), + ); + expect(hookRunner.runAgentEnd).toHaveBeenCalledWith( + expect.objectContaining({ + success: true, + messages: [ + { role: "user", content: "hi", timestamp: expect.any(Number) }, + expect.objectContaining({ + role: "assistant", + content: [{ type: "text", text: "hello from cli" }], + }), + ], + }), + expect.any(Object), + ); + } finally { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + it("emits agent_end with failure details when the CLI run fails", async () => { + const hookRunner = { + hasHooks: vi.fn((hookName: string) => ["llm_input", "agent_end"].includes(hookName)), + runLlmInput: vi.fn(async () => undefined), + runLlmOutput: vi.fn(async () => undefined), + runAgentEnd: vi.fn(async () => undefined), + }; + mockGetGlobalHookRunner.mockReturnValue(hookRunner as never); + + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "exit", + exitCode: 1, + exitSignal: null, + durationMs: 50, + stdout: "", + stderr: "rate limit exceeded", + timedOut: false, + noOutputTimedOut: false, + }), + ); + + await expect(runPreparedCliAgent(buildPreparedContext())).rejects.toThrow( + "rate limit exceeded", + ); + + await vi.waitFor(() => { + expect(hookRunner.runLlmInput).toHaveBeenCalledTimes(1); + expect(hookRunner.runLlmOutput).not.toHaveBeenCalled(); + expect(hookRunner.runAgentEnd).toHaveBeenCalledTimes(1); + }); + + expect(hookRunner.runAgentEnd).toHaveBeenCalledWith( + expect.objectContaining({ + success: false, + error: "rate limit exceeded", + messages: [{ role: "user", content: "hi", timestamp: expect.any(Number) }], + }), + expect.any(Object), + ); + }); }); describe("resolveCliNoOutputTimeoutMs", () => { diff --git a/src/agents/cli-runner.ts b/src/agents/cli-runner.ts index c26e3aaced6..961fac5319b 100644 --- a/src/agents/cli-runner.ts +++ b/src/agents/cli-runner.ts @@ -1,9 +1,56 @@ +import { SessionManager } from "@mariozechner/pi-coding-agent"; import { formatErrorMessage } from "../infra/errors.js"; import type { PreparedCliRunContext, RunCliAgentParams } from "./cli-runner/types.js"; import { FailoverError, isFailoverError, resolveFailoverStatus } from "./failover-error.js"; +import { + runAgentHarnessAgentEndHook, + runAgentHarnessLlmInputHook, + runAgentHarnessLlmOutputHook, +} from "./harness/lifecycle-hook-helpers.js"; import { classifyFailoverReason, isFailoverErrorMessage } from "./pi-embedded-helpers.js"; import type { EmbeddedPiRunResult } from "./pi-embedded-runner.js"; +function loadCliHookHistoryMessages(sessionFile: string): unknown[] { + try { + const entries = SessionManager.open(sessionFile).getEntries(); + return entries.flatMap((entry) => (entry.type === "message" ? [entry.message as unknown] : [])); + } catch { + return []; + } +} + +function buildCliHookUserMessage(prompt: string): unknown { + return { + role: "user", + content: prompt, + timestamp: Date.now(), + }; +} + +function buildCliHookAssistantMessage(params: { + text: string; + provider: string; + model: string; + usage?: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; + }; +}): unknown { + return { + role: "assistant", + content: [{ type: "text", text: params.text }], + api: "responses", + provider: params.provider, + model: params.model, + ...(params.usage ? { usage: params.usage } : {}), + stopReason: "stop", + timestamp: Date.now(), + }; +} + export async function runCliAgent(params: RunCliAgentParams): Promise { const { prepareCliRunContext } = await import("./cli-runner/prepare.runtime.js"); const context = await prepareCliRunContext(params); @@ -15,6 +62,60 @@ export async function runPreparedCliAgent( ): Promise { const { executePreparedCliRun } = await import("./cli-runner/execute.runtime.js"); const { params } = context; + const historyMessages = loadCliHookHistoryMessages(params.sessionFile); + const llmInputEvent = { + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: context.modelId, + systemPrompt: context.systemPrompt, + prompt: params.prompt, + historyMessages, + imagesCount: params.images?.length ?? 0, + } as const; + const hookContext = { + runId: params.runId, + agentId: params.agentId, + sessionKey: params.sessionKey, + sessionId: params.sessionId, + workspaceDir: params.workspaceDir, + messageProvider: params.messageProvider, + trigger: params.trigger, + channelId: params.messageChannel ?? params.messageProvider, + } as const; + + const runCliAttempt = async (cliSessionIdToUse?: string) => { + runAgentHarnessLlmInputHook({ + event: llmInputEvent, + ctx: hookContext, + }); + const output = await executePreparedCliRun(context, cliSessionIdToUse); + const assistantText = output.text.trim(); + const assistantTexts = assistantText ? [assistantText] : []; + const lastAssistant = + assistantText.length > 0 + ? buildCliHookAssistantMessage({ + text: assistantText, + provider: params.provider, + model: context.modelId, + usage: output.usage, + }) + : undefined; + runAgentHarnessLlmOutputHook({ + event: { + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: context.modelId, + assistantTexts, + ...(lastAssistant ? { lastAssistant } : {}), + ...(output.usage ? { usage: output.usage } : {}), + }, + ctx: hookContext, + }); + return { output, assistantText, lastAssistant }; + }; + const buildCliRunResult = (resultParams: { output: Awaited>; effectiveCliSessionId?: string; @@ -93,8 +194,20 @@ export async function runPreparedCliAgent( // Try with the provided CLI session ID first try { try { - const output = await executePreparedCliRun(context, context.reusableCliSession.sessionId); + const { output, lastAssistant } = await runCliAttempt(context.reusableCliSession.sessionId); const effectiveCliSessionId = output.sessionId ?? context.reusableCliSession.sessionId; + runAgentHarnessAgentEndHook({ + event: { + messages: [ + ...historyMessages, + buildCliHookUserMessage(params.prompt), + ...(lastAssistant ? [lastAssistant] : []), + ], + success: true, + durationMs: Date.now() - context.started, + }, + ctx: hookContext, + }); return buildCliRunResult({ output, effectiveCliSessionId }); } catch (err) { if (isFailoverError(err)) { @@ -106,23 +219,63 @@ export async function runPreparedCliAgent( // We'll need to modify the caller to handle this case // For now, retry without the session ID to create a new session - const output = await executePreparedCliRun(context, undefined); + const { output, lastAssistant } = await runCliAttempt(undefined); const effectiveCliSessionId = output.sessionId; + runAgentHarnessAgentEndHook({ + event: { + messages: [ + ...historyMessages, + buildCliHookUserMessage(params.prompt), + ...(lastAssistant ? [lastAssistant] : []), + ], + success: true, + durationMs: Date.now() - context.started, + }, + ctx: hookContext, + }); return buildCliRunResult({ output, effectiveCliSessionId }); } + runAgentHarnessAgentEndHook({ + event: { + messages: [...historyMessages, buildCliHookUserMessage(params.prompt)], + success: false, + error: formatErrorMessage(err), + durationMs: Date.now() - context.started, + }, + ctx: hookContext, + }); throw err; } const message = formatErrorMessage(err); if (isFailoverErrorMessage(message, { provider: params.provider })) { const reason = classifyFailoverReason(message, { provider: params.provider }) ?? "unknown"; const status = resolveFailoverStatus(reason); - throw new FailoverError(message, { + const failoverError = new FailoverError(message, { reason, provider: params.provider, model: context.modelId, status, }); + runAgentHarnessAgentEndHook({ + event: { + messages: [...historyMessages, buildCliHookUserMessage(params.prompt)], + success: false, + error: message, + durationMs: Date.now() - context.started, + }, + ctx: hookContext, + }); + throw failoverError; } + runAgentHarnessAgentEndHook({ + event: { + messages: [...historyMessages, buildCliHookUserMessage(params.prompt)], + success: false, + error: message, + durationMs: Date.now() - context.started, + }, + ctx: hookContext, + }); throw err; } } finally {