fix(agents): emit cli runner llm lifecycle hooks (#70731)

This commit is contained in:
Vincent Koc
2026-04-23 11:33:27 -07:00
committed by GitHub
parent c1f423f845
commit f7537faa21
2 changed files with 343 additions and 3 deletions

View File

@@ -1,9 +1,14 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
__testing as replyRunTesting,
createReplyOperation,
replyRunRegistry,
} from "../auto-reply/reply/reply-run-registry.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { runPreparedCliAgent } from "./cli-runner.js";
import {
createManagedRun,
@@ -15,6 +20,43 @@ import { executePreparedCliRun } from "./cli-runner/execute.js";
import { resolveCliNoOutputTimeoutMs } from "./cli-runner/helpers.js";
import type { PreparedCliRunContext } from "./cli-runner/types.js";
vi.mock("../plugins/hook-runner-global.js", async () => {
const actual = await vi.importActual<typeof import("../plugins/hook-runner-global.js")>(
"../plugins/hook-runner-global.js",
);
return {
...actual,
getGlobalHookRunner: vi.fn(() => null),
};
});
const mockGetGlobalHookRunner = vi.mocked(getGlobalHookRunner);
function createSessionFile(params?: { history?: Array<{ role: "user"; content: string }> }) {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-cli-hooks-"));
const sessionFile = path.join(dir, "session.jsonl");
fs.writeFileSync(
sessionFile,
`${JSON.stringify({
type: "session",
version: CURRENT_SESSION_VERSION,
id: "session-test",
timestamp: new Date(0).toISOString(),
cwd: dir,
})}\n`,
"utf-8",
);
const sessionManager = SessionManager.open(sessionFile);
for (const entry of params?.history ?? []) {
sessionManager.appendMessage({
role: entry.role,
content: entry.content,
timestamp: Date.now(),
});
}
return { dir, sessionFile };
}
function buildPreparedContext(params?: {
sessionKey?: string;
cliSessionId?: string;
@@ -67,6 +109,7 @@ function buildPreparedContext(params?: {
describe("runCliAgent reliability", () => {
afterEach(() => {
replyRunTesting.resetReplyRunRegistry();
mockGetGlobalHookRunner.mockReset();
});
it("fails with timeout when no-output watchdog trips", async () => {
@@ -312,6 +355,150 @@ describe("runCliAgent reliability", () => {
expect(result.meta.finalAssistantVisibleText).toBe("goodbye from cli");
expect(result.meta.finalAssistantRawText).toBe("hello from cli");
});
it("emits llm_input, llm_output, and agent_end hooks for successful CLI runs", async () => {
const hookRunner = {
hasHooks: vi.fn((hookName: string) =>
["llm_input", "llm_output", "agent_end"].includes(hookName),
),
runLlmInput: vi.fn(async () => undefined),
runLlmOutput: vi.fn(async () => undefined),
runAgentEnd: vi.fn(async () => undefined),
};
mockGetGlobalHookRunner.mockReturnValue(hookRunner as never);
const { dir, sessionFile } = createSessionFile();
supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: "hello from cli",
stderr: "",
timedOut: false,
noOutputTimedOut: false,
}),
);
try {
await runPreparedCliAgent({
...buildPreparedContext(),
params: {
...buildPreparedContext().params,
sessionFile,
workspaceDir: dir,
sessionKey: "agent:main:main",
agentId: "main",
messageProvider: "acp",
messageChannel: "telegram",
trigger: "user",
},
});
await vi.waitFor(() => {
expect(hookRunner.runLlmInput).toHaveBeenCalledTimes(1);
expect(hookRunner.runLlmOutput).toHaveBeenCalledTimes(1);
expect(hookRunner.runAgentEnd).toHaveBeenCalledTimes(1);
});
expect(hookRunner.runLlmInput).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-2",
sessionId: "s1",
provider: "codex-cli",
model: "gpt-5.4",
prompt: "hi",
systemPrompt: "You are a helpful assistant.",
historyMessages: expect.any(Array),
imagesCount: 0,
}),
expect.objectContaining({
runId: "run-2",
agentId: "main",
sessionKey: "agent:main:main",
sessionId: "s1",
workspaceDir: dir,
messageProvider: "acp",
trigger: "user",
channelId: "telegram",
}),
);
expect(hookRunner.runLlmOutput).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-2",
sessionId: "s1",
provider: "codex-cli",
model: "gpt-5.4",
assistantTexts: ["hello from cli"],
lastAssistant: expect.objectContaining({
role: "assistant",
content: [{ type: "text", text: "hello from cli" }],
provider: "codex-cli",
model: "gpt-5.4",
}),
}),
expect.any(Object),
);
expect(hookRunner.runAgentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: true,
messages: [
{ role: "user", content: "hi", timestamp: expect.any(Number) },
expect.objectContaining({
role: "assistant",
content: [{ type: "text", text: "hello from cli" }],
}),
],
}),
expect.any(Object),
);
} finally {
fs.rmSync(dir, { recursive: true, force: true });
}
});
it("emits agent_end with failure details when the CLI run fails", async () => {
const hookRunner = {
hasHooks: vi.fn((hookName: string) => ["llm_input", "agent_end"].includes(hookName)),
runLlmInput: vi.fn(async () => undefined),
runLlmOutput: vi.fn(async () => undefined),
runAgentEnd: vi.fn(async () => undefined),
};
mockGetGlobalHookRunner.mockReturnValue(hookRunner as never);
supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({
reason: "exit",
exitCode: 1,
exitSignal: null,
durationMs: 50,
stdout: "",
stderr: "rate limit exceeded",
timedOut: false,
noOutputTimedOut: false,
}),
);
await expect(runPreparedCliAgent(buildPreparedContext())).rejects.toThrow(
"rate limit exceeded",
);
await vi.waitFor(() => {
expect(hookRunner.runLlmInput).toHaveBeenCalledTimes(1);
expect(hookRunner.runLlmOutput).not.toHaveBeenCalled();
expect(hookRunner.runAgentEnd).toHaveBeenCalledTimes(1);
});
expect(hookRunner.runAgentEnd).toHaveBeenCalledWith(
expect.objectContaining({
success: false,
error: "rate limit exceeded",
messages: [{ role: "user", content: "hi", timestamp: expect.any(Number) }],
}),
expect.any(Object),
);
});
});
describe("resolveCliNoOutputTimeoutMs", () => {

View File

@@ -1,9 +1,56 @@
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { formatErrorMessage } from "../infra/errors.js";
import type { PreparedCliRunContext, RunCliAgentParams } from "./cli-runner/types.js";
import { FailoverError, isFailoverError, resolveFailoverStatus } from "./failover-error.js";
import {
runAgentHarnessAgentEndHook,
runAgentHarnessLlmInputHook,
runAgentHarnessLlmOutputHook,
} from "./harness/lifecycle-hook-helpers.js";
import { classifyFailoverReason, isFailoverErrorMessage } from "./pi-embedded-helpers.js";
import type { EmbeddedPiRunResult } from "./pi-embedded-runner.js";
function loadCliHookHistoryMessages(sessionFile: string): unknown[] {
try {
const entries = SessionManager.open(sessionFile).getEntries();
return entries.flatMap((entry) => (entry.type === "message" ? [entry.message as unknown] : []));
} catch {
return [];
}
}
function buildCliHookUserMessage(prompt: string): unknown {
return {
role: "user",
content: prompt,
timestamp: Date.now(),
};
}
function buildCliHookAssistantMessage(params: {
text: string;
provider: string;
model: string;
usage?: {
input?: number;
output?: number;
cacheRead?: number;
cacheWrite?: number;
total?: number;
};
}): unknown {
return {
role: "assistant",
content: [{ type: "text", text: params.text }],
api: "responses",
provider: params.provider,
model: params.model,
...(params.usage ? { usage: params.usage } : {}),
stopReason: "stop",
timestamp: Date.now(),
};
}
export async function runCliAgent(params: RunCliAgentParams): Promise<EmbeddedPiRunResult> {
const { prepareCliRunContext } = await import("./cli-runner/prepare.runtime.js");
const context = await prepareCliRunContext(params);
@@ -15,6 +62,60 @@ export async function runPreparedCliAgent(
): Promise<EmbeddedPiRunResult> {
const { executePreparedCliRun } = await import("./cli-runner/execute.runtime.js");
const { params } = context;
const historyMessages = loadCliHookHistoryMessages(params.sessionFile);
const llmInputEvent = {
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: context.modelId,
systemPrompt: context.systemPrompt,
prompt: params.prompt,
historyMessages,
imagesCount: params.images?.length ?? 0,
} as const;
const hookContext = {
runId: params.runId,
agentId: params.agentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
trigger: params.trigger,
channelId: params.messageChannel ?? params.messageProvider,
} as const;
const runCliAttempt = async (cliSessionIdToUse?: string) => {
runAgentHarnessLlmInputHook({
event: llmInputEvent,
ctx: hookContext,
});
const output = await executePreparedCliRun(context, cliSessionIdToUse);
const assistantText = output.text.trim();
const assistantTexts = assistantText ? [assistantText] : [];
const lastAssistant =
assistantText.length > 0
? buildCliHookAssistantMessage({
text: assistantText,
provider: params.provider,
model: context.modelId,
usage: output.usage,
})
: undefined;
runAgentHarnessLlmOutputHook({
event: {
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: context.modelId,
assistantTexts,
...(lastAssistant ? { lastAssistant } : {}),
...(output.usage ? { usage: output.usage } : {}),
},
ctx: hookContext,
});
return { output, assistantText, lastAssistant };
};
const buildCliRunResult = (resultParams: {
output: Awaited<ReturnType<typeof executePreparedCliRun>>;
effectiveCliSessionId?: string;
@@ -93,8 +194,20 @@ export async function runPreparedCliAgent(
// Try with the provided CLI session ID first
try {
try {
const output = await executePreparedCliRun(context, context.reusableCliSession.sessionId);
const { output, lastAssistant } = await runCliAttempt(context.reusableCliSession.sessionId);
const effectiveCliSessionId = output.sessionId ?? context.reusableCliSession.sessionId;
runAgentHarnessAgentEndHook({
event: {
messages: [
...historyMessages,
buildCliHookUserMessage(params.prompt),
...(lastAssistant ? [lastAssistant] : []),
],
success: true,
durationMs: Date.now() - context.started,
},
ctx: hookContext,
});
return buildCliRunResult({ output, effectiveCliSessionId });
} catch (err) {
if (isFailoverError(err)) {
@@ -106,23 +219,63 @@ export async function runPreparedCliAgent(
// We'll need to modify the caller to handle this case
// For now, retry without the session ID to create a new session
const output = await executePreparedCliRun(context, undefined);
const { output, lastAssistant } = await runCliAttempt(undefined);
const effectiveCliSessionId = output.sessionId;
runAgentHarnessAgentEndHook({
event: {
messages: [
...historyMessages,
buildCliHookUserMessage(params.prompt),
...(lastAssistant ? [lastAssistant] : []),
],
success: true,
durationMs: Date.now() - context.started,
},
ctx: hookContext,
});
return buildCliRunResult({ output, effectiveCliSessionId });
}
runAgentHarnessAgentEndHook({
event: {
messages: [...historyMessages, buildCliHookUserMessage(params.prompt)],
success: false,
error: formatErrorMessage(err),
durationMs: Date.now() - context.started,
},
ctx: hookContext,
});
throw err;
}
const message = formatErrorMessage(err);
if (isFailoverErrorMessage(message, { provider: params.provider })) {
const reason = classifyFailoverReason(message, { provider: params.provider }) ?? "unknown";
const status = resolveFailoverStatus(reason);
throw new FailoverError(message, {
const failoverError = new FailoverError(message, {
reason,
provider: params.provider,
model: context.modelId,
status,
});
runAgentHarnessAgentEndHook({
event: {
messages: [...historyMessages, buildCliHookUserMessage(params.prompt)],
success: false,
error: message,
durationMs: Date.now() - context.started,
},
ctx: hookContext,
});
throw failoverError;
}
runAgentHarnessAgentEndHook({
event: {
messages: [...historyMessages, buildCliHookUserMessage(params.prompt)],
success: false,
error: message,
durationMs: Date.now() - context.started,
},
ctx: hookContext,
});
throw err;
}
} finally {