mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:40:42 +00:00
fix(codex): bound dynamic tool bridge responses
This commit is contained in:
@@ -327,6 +327,44 @@ describe("CodexAppServerClient", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("fails closed when a dynamic tool server request handler hangs", async () => {
|
||||
vi.useFakeTimers();
|
||||
const warn = vi.spyOn(embeddedAgentLog, "warn").mockImplementation(() => undefined);
|
||||
const harness = createClientHarness();
|
||||
clients.push(harness.client);
|
||||
harness.client.addRequestHandler((request) => {
|
||||
if (request.method === "item/tool/call") {
|
||||
return new Promise<never>(() => undefined);
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
|
||||
harness.send({ id: "srv-timeout", method: "item/tool/call", params: { tool: "message" } });
|
||||
await vi.advanceTimersByTimeAsync(__testing.CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS);
|
||||
await vi.waitFor(() => expect(harness.writes.length).toBe(1));
|
||||
|
||||
expect(JSON.parse(harness.writes[0] ?? "{}")).toEqual({
|
||||
id: "srv-timeout",
|
||||
result: {
|
||||
success: false,
|
||||
contentItems: [
|
||||
{
|
||||
type: "inputText",
|
||||
text: `OpenClaw dynamic tool call timed out after ${__testing.CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS}ms before sending a response to Codex.`,
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"codex app-server server request timed out",
|
||||
expect.objectContaining({
|
||||
id: "srv-timeout",
|
||||
method: "item/tool/call",
|
||||
timeoutMs: __testing.CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("fails closed for unhandled native app-server approvals", async () => {
|
||||
const harness = createClientHarness();
|
||||
clients.push(harness.client);
|
||||
|
||||
@@ -25,6 +25,7 @@ import { MIN_CODEX_APP_SERVER_VERSION } from "./version.js";
|
||||
|
||||
export { MIN_CODEX_APP_SERVER_VERSION } from "./version.js";
|
||||
const CODEX_APP_SERVER_PARSE_LOG_MAX = 500;
|
||||
const CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS = 30_000;
|
||||
|
||||
type PendingRequest = {
|
||||
method: string;
|
||||
@@ -251,7 +252,13 @@ export class CodexAppServerClient {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.child.stdin.write(`${JSON.stringify(message)}\n`);
|
||||
const id = "id" in message ? message.id : undefined;
|
||||
const method = "method" in message ? message.method : undefined;
|
||||
this.child.stdin.write(`${JSON.stringify(message)}\n`, (error?: Error | null) => {
|
||||
if (error) {
|
||||
embeddedAgentLog.warn("codex app-server write failed", { error, id, method });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private handleLine(line: string): void {
|
||||
@@ -311,12 +318,10 @@ export class CodexAppServerClient {
|
||||
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
|
||||
): Promise<void> {
|
||||
try {
|
||||
for (const handler of this.requestHandlers) {
|
||||
const result = await handler(request);
|
||||
if (result !== undefined) {
|
||||
this.writeMessage({ id: request.id, result });
|
||||
return;
|
||||
}
|
||||
const result = await this.runServerRequestHandlers(request);
|
||||
if (result !== undefined) {
|
||||
this.writeMessage({ id: request.id, result });
|
||||
return;
|
||||
}
|
||||
this.writeMessage({ id: request.id, result: defaultServerRequestResponse(request) });
|
||||
} catch (error) {
|
||||
@@ -329,6 +334,49 @@ export class CodexAppServerClient {
|
||||
}
|
||||
}
|
||||
|
||||
private async runServerRequestHandlers(
|
||||
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
|
||||
): Promise<JsonValue | undefined> {
|
||||
const timeoutResponse = timeoutServerRequestResponse(request);
|
||||
if (!timeoutResponse) {
|
||||
return await this.runServerRequestHandlersWithoutTimeout(request);
|
||||
}
|
||||
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined;
|
||||
try {
|
||||
return await Promise.race([
|
||||
this.runServerRequestHandlersWithoutTimeout(request),
|
||||
new Promise<JsonValue>((resolve) => {
|
||||
timeout = setTimeout(() => {
|
||||
embeddedAgentLog.warn("codex app-server server request timed out", {
|
||||
id: request.id,
|
||||
method: request.method,
|
||||
timeoutMs: CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS,
|
||||
});
|
||||
resolve(timeoutResponse);
|
||||
}, CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS);
|
||||
timeout.unref?.();
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async runServerRequestHandlersWithoutTimeout(
|
||||
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
|
||||
): Promise<JsonValue | undefined> {
|
||||
for (const handler of this.requestHandlers) {
|
||||
const result = await handler(request);
|
||||
if (result !== undefined) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private handleNotification(notification: CodexServerNotification): void {
|
||||
for (const handler of this.notificationHandlers) {
|
||||
Promise.resolve(handler(notification)).catch((error: unknown) => {
|
||||
@@ -407,6 +455,23 @@ export function defaultServerRequestResponse(
|
||||
return {};
|
||||
}
|
||||
|
||||
function timeoutServerRequestResponse(
|
||||
request: Required<Pick<RpcRequest, "id" | "method">> & { params?: JsonValue },
|
||||
): JsonValue | undefined {
|
||||
if (request.method !== "item/tool/call") {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
contentItems: [
|
||||
{
|
||||
type: "inputText",
|
||||
text: `OpenClaw dynamic tool call timed out after ${CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS}ms before sending a response to Codex.`,
|
||||
},
|
||||
],
|
||||
success: false,
|
||||
};
|
||||
}
|
||||
|
||||
function assertSupportedCodexAppServerVersion(response: CodexInitializeResponse): void {
|
||||
const detectedVersion = readCodexVersionFromUserAgent(response.userAgent);
|
||||
if (!detectedVersion) {
|
||||
@@ -505,5 +570,6 @@ function formatExitValue(value: unknown): string {
|
||||
export const __testing = {
|
||||
closeCodexAppServerTransport,
|
||||
closeCodexAppServerTransportAndWait,
|
||||
CODEX_DYNAMIC_TOOL_SERVER_REQUEST_TIMEOUT_MS,
|
||||
redactCodexAppServerLinePreview,
|
||||
} as const;
|
||||
|
||||
@@ -674,6 +674,43 @@ describe("createCodexDynamicToolBridge", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("passes per-call abort signals into dynamic tool execution", async () => {
|
||||
let capturedSignal: AbortSignal | undefined;
|
||||
let resolveTool: ((result: AgentToolResult<unknown>) => void) | undefined;
|
||||
const execute = vi.fn(
|
||||
async (_callId: string, _args: Record<string, unknown>, signal: AbortSignal) =>
|
||||
await new Promise<AgentToolResult<unknown>>((resolve) => {
|
||||
capturedSignal = signal;
|
||||
resolveTool = resolve;
|
||||
}),
|
||||
);
|
||||
const runController = new AbortController();
|
||||
const callController = new AbortController();
|
||||
const bridge = createCodexDynamicToolBridge({
|
||||
tools: [createTool({ name: "exec", execute })],
|
||||
signal: runController.signal,
|
||||
});
|
||||
|
||||
const result = bridge.handleToolCall(
|
||||
{
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
callId: "call-signal",
|
||||
namespace: null,
|
||||
tool: "exec",
|
||||
arguments: { command: "sleep" },
|
||||
},
|
||||
{ signal: callController.signal },
|
||||
);
|
||||
await vi.waitFor(() => expect(capturedSignal).toBeDefined());
|
||||
|
||||
callController.abort(new Error("deadline"));
|
||||
expect(capturedSignal?.aborted).toBe(true);
|
||||
resolveTool?.(textToolResult("done"));
|
||||
|
||||
await expect(result).resolves.toEqual(expectInputText("done"));
|
||||
});
|
||||
|
||||
it("does not double-wrap dynamic tools that already have before_tool_call", async () => {
|
||||
const beforeToolCall = vi.fn(async () => ({ params: { mode: "safe" } }));
|
||||
initializeGlobalHookRunner(
|
||||
|
||||
@@ -23,7 +23,10 @@ import {
|
||||
|
||||
export type CodexDynamicToolBridge = {
|
||||
specs: CodexDynamicToolSpec[];
|
||||
handleToolCall: (params: CodexDynamicToolCallParams) => Promise<CodexDynamicToolCallResponse>;
|
||||
handleToolCall: (
|
||||
params: CodexDynamicToolCallParams,
|
||||
options?: { signal?: AbortSignal },
|
||||
) => Promise<CodexDynamicToolCallResponse>;
|
||||
telemetry: {
|
||||
didSendViaMessagingTool: boolean;
|
||||
messagingToolSentTexts: string[];
|
||||
@@ -74,7 +77,7 @@ export function createCodexDynamicToolBridge(params: {
|
||||
inputSchema: toJsonValue(tool.parameters),
|
||||
})),
|
||||
telemetry,
|
||||
handleToolCall: async (call) => {
|
||||
handleToolCall: async (call, options) => {
|
||||
const tool = toolMap.get(call.tool);
|
||||
if (!tool) {
|
||||
return {
|
||||
@@ -84,9 +87,10 @@ export function createCodexDynamicToolBridge(params: {
|
||||
}
|
||||
const args = jsonObjectToRecord(call.arguments);
|
||||
const startedAt = Date.now();
|
||||
const signal = composeAbortSignals(params.signal, options?.signal);
|
||||
try {
|
||||
const preparedArgs = tool.prepareArguments ? tool.prepareArguments(args) : args;
|
||||
const rawResult = await tool.execute(call.callId, preparedArgs, params.signal);
|
||||
const rawResult = await tool.execute(call.callId, preparedArgs, signal);
|
||||
const rawIsError = isToolResultError(rawResult);
|
||||
const middlewareResult = await middlewareRunner.applyToolResultMiddleware({
|
||||
threadId: call.threadId,
|
||||
@@ -161,6 +165,17 @@ export function createCodexDynamicToolBridge(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function composeAbortSignals(...signals: Array<AbortSignal | undefined>): AbortSignal {
|
||||
const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal));
|
||||
if (activeSignals.length === 0) {
|
||||
return new AbortController().signal;
|
||||
}
|
||||
if (activeSignals.length === 1) {
|
||||
return activeSignals[0]!;
|
||||
}
|
||||
return AbortSignal.any(activeSignals);
|
||||
}
|
||||
|
||||
function collectToolTelemetry(params: {
|
||||
toolName: string;
|
||||
args: Record<string, unknown>;
|
||||
|
||||
@@ -329,10 +329,47 @@ describe("runCodexAppServerAttempt", () => {
|
||||
nativeHookRelayTesting.clearNativeHookRelaysForTests();
|
||||
resetAgentEventsForTest();
|
||||
resetGlobalHookRunner();
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
await fs.rm(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("returns a failed dynamic tool response when an app-server tool call exceeds the deadline", async () => {
|
||||
vi.useFakeTimers();
|
||||
let capturedSignal: AbortSignal | undefined;
|
||||
const onTimeout = vi.fn();
|
||||
const response = __testing.handleDynamicToolCallWithTimeout({
|
||||
call: {
|
||||
threadId: "thread-1",
|
||||
turnId: "turn-1",
|
||||
callId: "call-timeout",
|
||||
namespace: null,
|
||||
tool: "message",
|
||||
arguments: { action: "send", text: "hello" },
|
||||
},
|
||||
toolBridge: {
|
||||
handleToolCall: vi.fn((_call, options) => {
|
||||
capturedSignal = options?.signal;
|
||||
return new Promise<never>(() => undefined);
|
||||
}),
|
||||
},
|
||||
signal: new AbortController().signal,
|
||||
timeoutMs: 1,
|
||||
onTimeout,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
await expect(response).resolves.toEqual({
|
||||
success: false,
|
||||
contentItems: [
|
||||
{ type: "inputText", text: "OpenClaw dynamic tool call timed out after 1ms." },
|
||||
],
|
||||
});
|
||||
expect(capturedSignal?.aborted).toBe(true);
|
||||
expect(onTimeout).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("applies before_prompt_build to Codex developer instructions and turn input", async () => {
|
||||
const beforePromptBuild = vi.fn(async () => ({
|
||||
systemPrompt: "custom codex system",
|
||||
|
||||
@@ -44,7 +44,7 @@ import { isCodexAppServerApprovalRequest, type CodexAppServerClient } from "./cl
|
||||
import { ensureCodexComputerUse } from "./computer-use.js";
|
||||
import { resolveCodexAppServerRuntimeOptions } from "./config.js";
|
||||
import { projectContextEngineAssemblyForCodex } from "./context-engine-projection.js";
|
||||
import { createCodexDynamicToolBridge } from "./dynamic-tools.js";
|
||||
import { createCodexDynamicToolBridge, type CodexDynamicToolBridge } from "./dynamic-tools.js";
|
||||
import { handleCodexAppServerElicitationRequest } from "./elicitation-bridge.js";
|
||||
import { CodexAppServerEventProjector } from "./event-projector.js";
|
||||
import {
|
||||
@@ -60,6 +60,7 @@ import {
|
||||
isJsonObject,
|
||||
type CodexServerNotification,
|
||||
type CodexDynamicToolCallParams,
|
||||
type CodexDynamicToolCallResponse,
|
||||
type CodexTurnStartResponse,
|
||||
type JsonObject,
|
||||
type JsonValue,
|
||||
@@ -81,6 +82,8 @@ import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
|
||||
import { createCodexUserInputBridge } from "./user-input-bridge.js";
|
||||
import { filterToolsForVisionInputs } from "./vision-tools.js";
|
||||
|
||||
const CODEX_DYNAMIC_TOOL_TIMEOUT_MS = 30_000;
|
||||
|
||||
type OpenClawCodingToolsOptions = NonNullable<
|
||||
Parameters<(typeof import("openclaw/plugin-sdk/agent-harness"))["createOpenClawCodingTools"]>[0]
|
||||
>;
|
||||
@@ -476,7 +479,21 @@ export async function runCodexAppServerAttempt(
|
||||
name: call.tool,
|
||||
arguments: call.arguments,
|
||||
});
|
||||
const response = await toolBridge.handleToolCall(call);
|
||||
const response = await handleDynamicToolCallWithTimeout({
|
||||
call,
|
||||
toolBridge,
|
||||
signal: runAbortController.signal,
|
||||
timeoutMs: CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
|
||||
onTimeout: () => {
|
||||
trajectoryRecorder?.recordEvent("tool.timeout", {
|
||||
threadId: call.threadId,
|
||||
turnId: call.turnId,
|
||||
toolCallId: call.callId,
|
||||
name: call.tool,
|
||||
timeoutMs: CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
|
||||
});
|
||||
},
|
||||
});
|
||||
trajectoryRecorder?.recordEvent("tool.result", {
|
||||
threadId: call.threadId,
|
||||
turnId: call.turnId,
|
||||
@@ -779,6 +796,79 @@ export async function runCodexAppServerAttempt(
|
||||
}
|
||||
}
|
||||
|
||||
async function handleDynamicToolCallWithTimeout(params: {
|
||||
call: CodexDynamicToolCallParams;
|
||||
toolBridge: Pick<CodexDynamicToolBridge, "handleToolCall">;
|
||||
signal: AbortSignal;
|
||||
timeoutMs: number;
|
||||
onTimeout?: () => void;
|
||||
}): Promise<CodexDynamicToolCallResponse> {
|
||||
if (params.signal.aborted) {
|
||||
return failedDynamicToolResponse("OpenClaw dynamic tool call aborted before execution.");
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined;
|
||||
let timedOut = false;
|
||||
let resolveAbort: ((response: CodexDynamicToolCallResponse) => void) | undefined;
|
||||
const abortFromRun = () => {
|
||||
const message = "OpenClaw dynamic tool call aborted.";
|
||||
controller.abort(params.signal.reason ?? new Error(message));
|
||||
resolveAbort?.(failedDynamicToolResponse(message));
|
||||
};
|
||||
const abortPromise = new Promise<CodexDynamicToolCallResponse>((resolve) => {
|
||||
resolveAbort = resolve;
|
||||
});
|
||||
const timeoutPromise = new Promise<CodexDynamicToolCallResponse>((resolve) => {
|
||||
const timeoutMs = Math.max(1, Math.min(CODEX_DYNAMIC_TOOL_TIMEOUT_MS, params.timeoutMs));
|
||||
timeout = setTimeout(() => {
|
||||
timedOut = true;
|
||||
const message = `OpenClaw dynamic tool call timed out after ${timeoutMs}ms.`;
|
||||
controller.abort(new Error(message));
|
||||
params.onTimeout?.();
|
||||
embeddedAgentLog.warn("codex dynamic tool call timed out", {
|
||||
tool: params.call.tool,
|
||||
toolCallId: params.call.callId,
|
||||
threadId: params.call.threadId,
|
||||
turnId: params.call.turnId,
|
||||
timeoutMs,
|
||||
});
|
||||
resolve(failedDynamicToolResponse(message));
|
||||
}, timeoutMs);
|
||||
timeout.unref?.();
|
||||
});
|
||||
|
||||
try {
|
||||
params.signal.addEventListener("abort", abortFromRun, { once: true });
|
||||
if (params.signal.aborted) {
|
||||
abortFromRun();
|
||||
}
|
||||
return await Promise.race([
|
||||
params.toolBridge.handleToolCall(params.call, { signal: controller.signal }),
|
||||
abortPromise,
|
||||
timeoutPromise,
|
||||
]);
|
||||
} catch (error) {
|
||||
return failedDynamicToolResponse(error instanceof Error ? error.message : String(error));
|
||||
} finally {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
params.signal.removeEventListener("abort", abortFromRun);
|
||||
resolveAbort = undefined;
|
||||
if (!timedOut && !controller.signal.aborted) {
|
||||
controller.abort(new Error("OpenClaw dynamic tool call finished."));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function failedDynamicToolResponse(message: string): CodexDynamicToolCallResponse {
|
||||
return {
|
||||
success: false,
|
||||
contentItems: [{ type: "inputText", text: message }],
|
||||
};
|
||||
}
|
||||
|
||||
function createCodexNativeHookRelay(params: {
|
||||
options:
|
||||
| {
|
||||
@@ -1075,7 +1165,9 @@ function handleApprovalRequest(params: {
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
|
||||
filterToolsForVisionInputs,
|
||||
handleDynamicToolCallWithTimeout,
|
||||
...createCodexAppServerClientFactoryTestHooks((factory) => {
|
||||
clientFactory = factory;
|
||||
}),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
export type CodexAppServerTransport = {
|
||||
stdin: {
|
||||
write: (data: string) => unknown;
|
||||
write: (data: string, callback?: (error?: Error | null) => void) => unknown;
|
||||
end?: () => unknown;
|
||||
destroy?: () => unknown;
|
||||
unref?: () => unknown;
|
||||
|
||||
Reference in New Issue
Block a user