mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 13:50:49 +00:00
feat(codex): add tool hook parity (#70307)
* feat(codex): add tool hook parity * fix(codex): stabilize tool hook parity * fix(codex): tighten transcript hook typing * fix(codex): preserve mirrored transcript idempotency * fix(codex): normalize tool hook context
This commit is contained in:
@@ -1,6 +1,13 @@
|
||||
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
|
||||
import type { AnyAgentTool } from "openclaw/plugin-sdk/agent-harness";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
initializeGlobalHookRunner,
|
||||
resetGlobalHookRunner,
|
||||
} from "../../../../src/plugins/hook-runner-global.js";
|
||||
import { createMockPluginRegistry } from "../../../../src/plugins/hooks.test-helpers.js";
|
||||
import { createEmptyPluginRegistry } from "../../../../src/plugins/registry.js";
|
||||
import { setActivePluginRegistry } from "../../../../src/plugins/runtime.js";
|
||||
import { createCodexDynamicToolBridge } from "./dynamic-tools.js";
|
||||
import type { JsonValue } from "./protocol.js";
|
||||
|
||||
@@ -58,6 +65,11 @@ async function handleMessageToolCall(
|
||||
});
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
resetGlobalHookRunner();
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
});
|
||||
|
||||
describe("createCodexDynamicToolBridge", () => {
|
||||
it.each([
|
||||
{ toolName: "tts", mediaUrl: "/tmp/reply.opus", audioAsVoice: true },
|
||||
@@ -152,4 +164,82 @@ describe("createCodexDynamicToolBridge", () => {
|
||||
messagingToolSentTargets: [],
|
||||
});
|
||||
});
|
||||
|
||||
it("applies codex app-server tool_result extensions from the active plugin registry", async () => {
|
||||
const registry = createEmptyPluginRegistry();
|
||||
const factory = async (codex: {
|
||||
on: (
|
||||
event: "tool_result",
|
||||
handler: (event: any) => Promise<{ result: AgentToolResult<unknown> }>,
|
||||
) => void;
|
||||
}) => {
|
||||
codex.on("tool_result", async (event) => ({
|
||||
result: {
|
||||
...event.result,
|
||||
content: [{ type: "text", text: `${event.toolName} compacted` }],
|
||||
},
|
||||
}));
|
||||
};
|
||||
registry.codexAppServerExtensionFactories.push({
|
||||
pluginId: "tokenjuice",
|
||||
pluginName: "Tokenjuice",
|
||||
rawFactory: factory,
|
||||
factory,
|
||||
source: "test",
|
||||
});
|
||||
setActivePluginRegistry(registry);
|
||||
|
||||
const bridge = createBridgeWithToolResult("exec", {
|
||||
content: [{ type: "text", text: "raw output" }],
|
||||
details: {},
|
||||
});
|
||||
|
||||
const result = await bridge.handleToolCall({
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
callId: "call-1",
|
||||
tool: "exec",
|
||||
arguments: { command: "git status" },
|
||||
});
|
||||
|
||||
expect(result).toEqual(expectInputText("exec compacted"));
|
||||
});
|
||||
|
||||
it("fires after_tool_call for successful codex tool executions", async () => {
|
||||
const afterToolCall = vi.fn();
|
||||
initializeGlobalHookRunner(
|
||||
createMockPluginRegistry([{ hookName: "after_tool_call", handler: afterToolCall }]),
|
||||
);
|
||||
|
||||
const bridge = createBridgeWithToolResult("exec", {
|
||||
content: [{ type: "text", text: "done" }],
|
||||
details: {},
|
||||
});
|
||||
|
||||
await bridge.handleToolCall({
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
callId: "call-1",
|
||||
tool: "exec",
|
||||
arguments: { command: "pwd" },
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(afterToolCall).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
toolName: "exec",
|
||||
toolCallId: "call-1",
|
||||
params: { command: "pwd" },
|
||||
result: expect.objectContaining({
|
||||
content: [{ type: "text", text: "done" }],
|
||||
details: {},
|
||||
}),
|
||||
}),
|
||||
expect.objectContaining({
|
||||
toolName: "exec",
|
||||
toolCallId: "call-1",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
|
||||
import type { ImageContent, TextContent } from "@mariozechner/pi-ai";
|
||||
import {
|
||||
createCodexAppServerToolResultExtensionRunner,
|
||||
extractToolResultMediaArtifact,
|
||||
filterToolResultMediaUrls,
|
||||
isMessagingTool,
|
||||
isMessagingToolSendAction,
|
||||
runAgentHarnessAfterToolCallHook,
|
||||
type AnyAgentTool,
|
||||
type MessagingToolSend,
|
||||
} from "openclaw/plugin-sdk/agent-harness";
|
||||
@@ -33,6 +35,12 @@ export type CodexDynamicToolBridge = {
|
||||
export function createCodexDynamicToolBridge(params: {
|
||||
tools: AnyAgentTool[];
|
||||
signal: AbortSignal;
|
||||
hookContext?: {
|
||||
agentId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runId?: string;
|
||||
};
|
||||
}): CodexDynamicToolBridge {
|
||||
const toolMap = new Map(params.tools.map((tool) => [tool.name, tool]));
|
||||
const telemetry: CodexDynamicToolBridge["telemetry"] = {
|
||||
@@ -43,6 +51,7 @@ export function createCodexDynamicToolBridge(params: {
|
||||
toolMediaUrls: [],
|
||||
toolAudioAsVoice: false,
|
||||
};
|
||||
const extensionRunner = createCodexAppServerToolResultExtensionRunner(params.hookContext ?? {});
|
||||
|
||||
return {
|
||||
specs: params.tools.map((tool) => ({
|
||||
@@ -60,9 +69,18 @@ export function createCodexDynamicToolBridge(params: {
|
||||
};
|
||||
}
|
||||
const args = jsonObjectToRecord(call.arguments);
|
||||
const startedAt = Date.now();
|
||||
try {
|
||||
const preparedArgs = tool.prepareArguments ? tool.prepareArguments(args) : args;
|
||||
const result = await tool.execute(call.callId, preparedArgs, params.signal);
|
||||
const rawResult = await tool.execute(call.callId, preparedArgs, params.signal);
|
||||
const result = await extensionRunner.applyToolResultExtensions({
|
||||
threadId: call.threadId,
|
||||
turnId: call.turnId,
|
||||
toolCallId: call.callId,
|
||||
toolName: tool.name,
|
||||
args,
|
||||
result: rawResult,
|
||||
});
|
||||
collectToolTelemetry({
|
||||
toolName: tool.name,
|
||||
args,
|
||||
@@ -70,6 +88,17 @@ export function createCodexDynamicToolBridge(params: {
|
||||
telemetry,
|
||||
isError: false,
|
||||
});
|
||||
void runAgentHarnessAfterToolCallHook({
|
||||
toolName: tool.name,
|
||||
toolCallId: call.callId,
|
||||
runId: params.hookContext?.runId,
|
||||
agentId: params.hookContext?.agentId,
|
||||
sessionId: params.hookContext?.sessionId,
|
||||
sessionKey: params.hookContext?.sessionKey,
|
||||
startArgs: args,
|
||||
result,
|
||||
startedAt,
|
||||
});
|
||||
return {
|
||||
contentItems: result.content.flatMap(convertToolContent),
|
||||
success: true,
|
||||
@@ -82,6 +111,17 @@ export function createCodexDynamicToolBridge(params: {
|
||||
telemetry,
|
||||
isError: true,
|
||||
});
|
||||
void runAgentHarnessAfterToolCallHook({
|
||||
toolName: tool.name,
|
||||
toolCallId: call.callId,
|
||||
runId: params.hookContext?.runId,
|
||||
agentId: params.hookContext?.agentId,
|
||||
sessionId: params.hookContext?.sessionId,
|
||||
sessionKey: params.hookContext?.sessionKey,
|
||||
startArgs: args,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
startedAt,
|
||||
});
|
||||
return {
|
||||
contentItems: [
|
||||
{
|
||||
|
||||
@@ -100,6 +100,12 @@ export async function runCodexAppServerAttempt(
|
||||
const toolBridge = createCodexDynamicToolBridge({
|
||||
tools,
|
||||
signal: runAbortController.signal,
|
||||
hookContext: {
|
||||
agentId: sessionAgentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
runId: params.runId,
|
||||
},
|
||||
});
|
||||
const historyMessages = readMirroredSessionHistoryMessages(params.sessionFile);
|
||||
const promptBuild = await resolveAgentHarnessBeforePromptBuildResult({
|
||||
@@ -279,7 +285,9 @@ export async function runCodexAppServerAttempt(
|
||||
const result = activeProjector.buildResult(toolBridge.telemetry, { yieldDetected });
|
||||
await mirrorTranscriptBestEffort({
|
||||
params,
|
||||
agentId: sessionAgentId,
|
||||
result,
|
||||
sessionKey: sandboxSessionKey,
|
||||
threadId: thread.threadId,
|
||||
turnId: activeTurnId,
|
||||
});
|
||||
@@ -514,14 +522,17 @@ function readMirroredSessionHistoryMessages(sessionFile: string): unknown[] {
|
||||
|
||||
async function mirrorTranscriptBestEffort(params: {
|
||||
params: EmbeddedRunAttemptParams;
|
||||
agentId?: string;
|
||||
result: EmbeddedRunAttemptResult;
|
||||
sessionKey?: string;
|
||||
threadId: string;
|
||||
turnId: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile: params.params.sessionFile,
|
||||
sessionKey: params.params.sessionKey,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
messages: params.result.messagesSnapshot,
|
||||
idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`,
|
||||
});
|
||||
|
||||
@@ -1,92 +1,186 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
castAgentMessage,
|
||||
makeAgentAssistantMessage,
|
||||
makeAgentUserMessage,
|
||||
} from "../../../../src/agents/test-helpers/agent-message-fixtures.js";
|
||||
import {
|
||||
initializeGlobalHookRunner,
|
||||
resetGlobalHookRunner,
|
||||
} from "../../../../src/plugins/hook-runner-global.js";
|
||||
import { createMockPluginRegistry } from "../../../../src/plugins/hooks.test-helpers.js";
|
||||
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
|
||||
|
||||
let tempDir: string;
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
function assistantMessage(text: string, timestamp: number): AgentMessage {
|
||||
return {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text }],
|
||||
api: "openai-codex-responses",
|
||||
provider: "openai-codex",
|
||||
model: "gpt-5.4-codex",
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
stopReason: "stop",
|
||||
timestamp,
|
||||
};
|
||||
afterEach(async () => {
|
||||
resetGlobalHookRunner();
|
||||
for (const dir of tempDirs.splice(0)) {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
async function createTempSessionFile() {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-transcript-"));
|
||||
tempDirs.push(dir);
|
||||
return path.join(dir, "session.jsonl");
|
||||
}
|
||||
|
||||
describe("mirrorCodexAppServerTranscript", () => {
|
||||
beforeEach(async () => {
|
||||
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-transcript-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("mirrors user and assistant messages into the PI transcript", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
it("mirrors user and assistant messages into the Pi transcript", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
{ role: "user", content: "hello", timestamp: 1 },
|
||||
assistantMessage("Codex plan:\ninspect", 2),
|
||||
assistantMessage("hi", 3),
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hi there" }],
|
||||
timestamp: Date.now() + 1,
|
||||
}),
|
||||
],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const records = (await fs.readFile(sessionFile, "utf8"))
|
||||
.trim()
|
||||
.split("\n")
|
||||
.map((line) => JSON.parse(line) as { type?: string; message?: { role?: string } });
|
||||
expect(records[0]?.type).toBe("session");
|
||||
expect(records.slice(1).map((record) => record.message?.role)).toEqual([
|
||||
"user",
|
||||
"assistant",
|
||||
"assistant",
|
||||
]);
|
||||
const raw = await fs.readFile(sessionFile, "utf8");
|
||||
expect(raw).toContain('"role":"user"');
|
||||
expect(raw).toContain('"content":[{"type":"text","text":"hello"}]');
|
||||
expect(raw).toContain('"role":"assistant"');
|
||||
expect(raw).toContain('"content":[{"type":"text","text":"hi there"}]');
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:user:0"');
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:1"');
|
||||
});
|
||||
|
||||
it("deduplicates app-server turn mirrors by idempotency scope", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const messages = [
|
||||
{ role: "user" as const, content: "hello", timestamp: 1 },
|
||||
assistantMessage("hi", 2),
|
||||
];
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hi there" }],
|
||||
timestamp: Date.now() + 1,
|
||||
}),
|
||||
] as const;
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
messages,
|
||||
idempotencyScope: "codex-app-server:thread-1:turn-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [...messages],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
messages,
|
||||
idempotencyScope: "codex-app-server:thread-1:turn-1",
|
||||
sessionKey: "session-1",
|
||||
messages: [...messages],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const records = (await fs.readFile(sessionFile, "utf8"))
|
||||
.trim()
|
||||
.split("\n")
|
||||
.map((line) => JSON.parse(line) as { message?: { role?: string; idempotencyKey?: string } });
|
||||
expect(records.slice(1).map((record) => record.message?.role)).toEqual(["user", "assistant"]);
|
||||
expect(records.slice(1).map((record) => record.message?.idempotencyKey)).toEqual([
|
||||
"codex-app-server:thread-1:turn-1:user:0",
|
||||
"codex-app-server:thread-1:turn-1:assistant:1",
|
||||
]);
|
||||
.filter(Boolean)
|
||||
.map((line) => JSON.parse(line) as { type?: string; message?: { role?: string } });
|
||||
expect(records.slice(1)).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("runs before_message_write before appending mirrored transcript messages", async () => {
|
||||
initializeGlobalHookRunner(
|
||||
createMockPluginRegistry([
|
||||
{
|
||||
hookName: "before_message_write",
|
||||
handler: (event) => ({
|
||||
message: castAgentMessage({
|
||||
...((event as { message: unknown }).message as Record<string, unknown>),
|
||||
content: [{ type: "text", text: "hello [hooked]" }],
|
||||
}),
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
const sessionFile = await createTempSessionFile();
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const raw = await fs.readFile(sessionFile, "utf8");
|
||||
expect(raw).toContain('"content":[{"type":"text","text":"hello [hooked]"}]');
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:0"');
|
||||
});
|
||||
|
||||
it("preserves the computed idempotency key when hooks rewrite message keys", async () => {
|
||||
initializeGlobalHookRunner(
|
||||
createMockPluginRegistry([
|
||||
{
|
||||
hookName: "before_message_write",
|
||||
handler: (event) => ({
|
||||
message: castAgentMessage({
|
||||
...((event as { message: unknown }).message as Record<string, unknown>),
|
||||
idempotencyKey: "hook-rewritten-key",
|
||||
}),
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
const sessionFile = await createTempSessionFile();
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const raw = await fs.readFile(sessionFile, "utf8");
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:0"');
|
||||
expect(raw).not.toContain("hook-rewritten-key");
|
||||
});
|
||||
|
||||
it("respects before_message_write blocking decisions", async () => {
|
||||
initializeGlobalHookRunner(
|
||||
createMockPluginRegistry([
|
||||
{
|
||||
hookName: "before_message_write",
|
||||
handler: () => ({ block: true }),
|
||||
},
|
||||
]),
|
||||
);
|
||||
const sessionFile = await createTempSessionFile();
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "should not persist" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
await expect(fs.readFile(sessionFile, "utf8")).rejects.toMatchObject({ code: "ENOENT" });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,11 +5,13 @@ import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
emitSessionTranscriptUpdate,
|
||||
runAgentHarnessBeforeMessageWriteHook,
|
||||
} from "openclaw/plugin-sdk/agent-harness";
|
||||
|
||||
export async function mirrorCodexAppServerTranscript(params: {
|
||||
sessionFile: string;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
messages: AgentMessage[];
|
||||
idempotencyScope?: string;
|
||||
}): Promise<void> {
|
||||
@@ -39,7 +41,21 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
...message,
|
||||
...(idempotencyKey ? { idempotencyKey } : {}),
|
||||
} as Parameters<SessionManager["appendMessage"]>[0];
|
||||
sessionManager.appendMessage(transcriptMessage);
|
||||
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
|
||||
message: transcriptMessage,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
if (!nextMessage) {
|
||||
continue;
|
||||
}
|
||||
const messageToAppend = (idempotencyKey
|
||||
? {
|
||||
...(nextMessage as unknown as Record<string, unknown>),
|
||||
idempotencyKey,
|
||||
}
|
||||
: nextMessage) as unknown as Parameters<SessionManager["appendMessage"]>[0];
|
||||
sessionManager.appendMessage(messageToAppend);
|
||||
if (idempotencyKey) {
|
||||
existingIdempotencyKeys.add(idempotencyKey);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user