fix(codex): release quiet app-server turns

This commit is contained in:
Peter Steinberger
2026-04-29 19:41:51 +01:00
parent fbae2a6441
commit 072e73d7c3
4 changed files with 261 additions and 66 deletions

View File

@@ -370,6 +370,79 @@ describe("runCodexAppServerAttempt", () => {
expect(onTimeout).toHaveBeenCalledTimes(1);
});
it("releases the session when Codex never completes after a dynamic tool response", async () => {
let handleRequest:
| ((request: { id: string; method: string; params?: unknown }) => Promise<unknown>)
| undefined;
const request = vi.fn(async (method: string) => {
if (method === "thread/start") {
return threadStartResult("thread-1");
}
if (method === "turn/start") {
return turnStartResult("turn-1", "inProgress");
}
return {};
});
__testing.setCodexAppServerClientFactoryForTests(
async () =>
({
request,
addNotificationHandler: () => () => undefined,
addRequestHandler: (
handler: (request: {
id: string;
method: string;
params?: unknown;
}) => Promise<unknown>,
) => {
handleRequest = handler;
return () => undefined;
},
}) as never,
);
const params = createParams(
path.join(tempDir, "session.jsonl"),
path.join(tempDir, "workspace"),
);
params.timeoutMs = 60_000;
const run = runCodexAppServerAttempt(params, { turnCompletionIdleTimeoutMs: 5 });
await vi.waitFor(() => expect(handleRequest).toBeTypeOf("function"), { interval: 1 });
await expect(
handleRequest?.({
id: "request-tool-1",
method: "item/tool/call",
params: {
threadId: "thread-1",
turnId: "turn-1",
callId: "call-1",
namespace: null,
tool: "message",
arguments: { action: "send", text: "already sent" },
},
}),
).resolves.toMatchObject({
success: false,
contentItems: [{ type: "inputText", text: "Unknown OpenClaw tool: message" }],
});
await expect(run).resolves.toMatchObject({
aborted: true,
timedOut: true,
promptError: "codex app-server turn idle timed out waiting for turn/completed",
});
await vi.waitFor(
() =>
expect(request).toHaveBeenCalledWith("turn/interrupt", {
threadId: "thread-1",
turnId: "turn-1",
}),
{ interval: 1 },
);
expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false);
});
it("applies before_prompt_build to Codex developer instructions and turn input", async () => {
const beforePromptBuild = vi.fn(async () => ({
systemPrompt: "custom codex system",

View File

@@ -83,6 +83,7 @@ import { createCodexUserInputBridge } from "./user-input-bridge.js";
import { filterToolsForVisionInputs } from "./vision-tools.js";
const CODEX_DYNAMIC_TOOL_TIMEOUT_MS = 30_000;
const CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS = 60_000;
type OpenClawCodingToolsOptions = NonNullable<
Parameters<(typeof import("openclaw/plugin-sdk/agent-harness"))["createOpenClawCodingTools"]>[0]
@@ -132,6 +133,7 @@ export async function runCodexAppServerAttempt(
gatewayTimeoutMs?: number;
hookTimeoutSec?: number;
};
turnCompletionIdleTimeoutMs?: number;
} = {},
): Promise<EmbeddedRunAttemptResult> {
const attemptStartedAt = Date.now();
@@ -364,6 +366,8 @@ export async function runCodexAppServerAttempt(
let userInputBridge: ReturnType<typeof createCodexUserInputBridge> | undefined;
let completed = false;
let timedOut = false;
let turnCompletionIdleTimedOut = false;
let turnCompletionIdleTimeoutMessage: string | undefined;
let lifecycleStarted = false;
let lifecycleTerminalEmitted = false;
let resolveCompletion: (() => void) | undefined;
@@ -371,6 +375,82 @@ export async function runCodexAppServerAttempt(
resolveCompletion = resolve;
});
let notificationQueue: Promise<void> = Promise.resolve();
const turnCompletionIdleTimeoutMs = resolveCodexTurnCompletionIdleTimeoutMs(
options.turnCompletionIdleTimeoutMs,
);
let turnCompletionIdleTimer: ReturnType<typeof setTimeout> | undefined;
let turnCompletionIdleWatchArmed = false;
let turnCompletionLastActivityAt = Date.now();
let turnCompletionLastActivityReason = "startup";
let activeAppServerTurnRequests = 0;
const clearTurnCompletionIdleTimer = () => {
if (turnCompletionIdleTimer) {
clearTimeout(turnCompletionIdleTimer);
turnCompletionIdleTimer = undefined;
}
};
const fireTurnCompletionIdleTimeout = () => {
if (
completed ||
runAbortController.signal.aborted ||
!turnCompletionIdleWatchArmed ||
activeAppServerTurnRequests > 0
) {
return;
}
const idleMs = Math.max(0, Date.now() - turnCompletionLastActivityAt);
if (idleMs < turnCompletionIdleTimeoutMs) {
scheduleTurnCompletionIdleWatch();
return;
}
timedOut = true;
turnCompletionIdleTimedOut = true;
turnCompletionIdleTimeoutMessage =
"codex app-server turn idle timed out waiting for turn/completed";
projector?.markTimedOut();
trajectoryRecorder?.recordEvent("turn.completion_idle_timeout", {
threadId: thread.threadId,
turnId,
idleMs,
timeoutMs: turnCompletionIdleTimeoutMs,
lastActivityReason: turnCompletionLastActivityReason,
});
embeddedAgentLog.warn("codex app-server turn idle timed out waiting for completion", {
threadId: thread.threadId,
turnId,
idleMs,
timeoutMs: turnCompletionIdleTimeoutMs,
lastActivityReason: turnCompletionLastActivityReason,
});
runAbortController.abort("turn_completion_idle_timeout");
};
function scheduleTurnCompletionIdleWatch() {
clearTurnCompletionIdleTimer();
if (
completed ||
runAbortController.signal.aborted ||
!turnCompletionIdleWatchArmed ||
activeAppServerTurnRequests > 0
) {
return;
}
const elapsedMs = Math.max(0, Date.now() - turnCompletionLastActivityAt);
const delayMs = Math.max(1, turnCompletionIdleTimeoutMs - elapsedMs);
turnCompletionIdleTimer = setTimeout(fireTurnCompletionIdleTimeout, delayMs);
turnCompletionIdleTimer.unref?.();
}
const touchTurnCompletionActivity = (reason: string, options?: { arm?: boolean }) => {
turnCompletionLastActivityAt = Date.now();
turnCompletionLastActivityReason = reason;
if (options?.arm) {
turnCompletionIdleWatchArmed = true;
}
scheduleTurnCompletionIdleWatch();
};
const emitLifecycleStart = () => {
emitCodexAppServerEvent(params, {
@@ -396,6 +476,7 @@ export async function runCodexAppServerAttempt(
};
const handleNotification = async (notification: CodexServerNotification) => {
touchTurnCompletionActivity(`notification:${notification.method}`);
userInputBridge?.handleNotification(notification);
if (!projector || !turnId) {
pendingNotifications.push(notification);
@@ -417,6 +498,7 @@ export async function runCodexAppServerAttempt(
} finally {
if (isTurnCompletion) {
completed = true;
clearTurnCompletionIdleTimer();
resolveCompletion?.();
}
}
@@ -431,78 +513,93 @@ export async function runCodexAppServerAttempt(
const notificationCleanup = client.addNotificationHandler(enqueueNotification);
const requestCleanup = client.addRequestHandler(async (request) => {
if (request.method === "account/chatgptAuthTokens/refresh") {
return refreshCodexAppServerAuthTokens({
agentDir,
authProfileId: startupAuthProfileId,
});
}
if (!turnId) {
return undefined;
}
if (request.method === "mcpServer/elicitation/request") {
return handleCodexAppServerElicitationRequest({
requestParams: request.params,
paramsForRun: params,
threadId: thread.threadId,
turnId,
signal: runAbortController.signal,
});
}
if (request.method === "item/tool/requestUserInput") {
return userInputBridge?.handleRequest({
id: request.id,
params: request.params,
});
}
if (request.method !== "item/tool/call") {
if (isCodexAppServerApprovalRequest(request.method)) {
return handleApprovalRequest({
method: request.method,
params: request.params,
activeAppServerTurnRequests += 1;
clearTurnCompletionIdleTimer();
touchTurnCompletionActivity(`request:${request.method}`);
let armCompletionWatchOnResponse = false;
try {
if (request.method === "account/chatgptAuthTokens/refresh") {
return refreshCodexAppServerAuthTokens({
agentDir,
authProfileId: startupAuthProfileId,
});
}
if (!turnId) {
return undefined;
}
if (request.method === "mcpServer/elicitation/request") {
armCompletionWatchOnResponse = true;
return handleCodexAppServerElicitationRequest({
requestParams: request.params,
paramsForRun: params,
threadId: thread.threadId,
turnId,
signal: runAbortController.signal,
});
}
return undefined;
}
const call = readDynamicToolCallParams(request.params);
if (!call || call.threadId !== thread.threadId || call.turnId !== turnId) {
return undefined;
}
trajectoryRecorder?.recordEvent("tool.call", {
threadId: call.threadId,
turnId: call.turnId,
toolCallId: call.callId,
name: call.tool,
arguments: call.arguments,
});
const response = await handleDynamicToolCallWithTimeout({
call,
toolBridge,
signal: runAbortController.signal,
timeoutMs: CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
onTimeout: () => {
trajectoryRecorder?.recordEvent("tool.timeout", {
threadId: call.threadId,
turnId: call.turnId,
toolCallId: call.callId,
name: call.tool,
timeoutMs: CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
if (request.method === "item/tool/requestUserInput") {
armCompletionWatchOnResponse = true;
return userInputBridge?.handleRequest({
id: request.id,
params: request.params,
});
},
});
trajectoryRecorder?.recordEvent("tool.result", {
threadId: call.threadId,
turnId: call.turnId,
toolCallId: call.callId,
name: call.tool,
success: response.success,
contentItems: response.contentItems,
});
return response as JsonValue;
}
if (request.method !== "item/tool/call") {
if (isCodexAppServerApprovalRequest(request.method)) {
armCompletionWatchOnResponse = true;
return handleApprovalRequest({
method: request.method,
params: request.params,
paramsForRun: params,
threadId: thread.threadId,
turnId,
signal: runAbortController.signal,
});
}
return undefined;
}
const call = readDynamicToolCallParams(request.params);
if (!call || call.threadId !== thread.threadId || call.turnId !== turnId) {
return undefined;
}
armCompletionWatchOnResponse = true;
trajectoryRecorder?.recordEvent("tool.call", {
threadId: call.threadId,
turnId: call.turnId,
toolCallId: call.callId,
name: call.tool,
arguments: call.arguments,
});
const response = await handleDynamicToolCallWithTimeout({
call,
toolBridge,
signal: runAbortController.signal,
timeoutMs: CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
onTimeout: () => {
trajectoryRecorder?.recordEvent("tool.timeout", {
threadId: call.threadId,
turnId: call.turnId,
toolCallId: call.callId,
name: call.tool,
timeoutMs: CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
});
},
});
trajectoryRecorder?.recordEvent("tool.result", {
threadId: call.threadId,
turnId: call.turnId,
toolCallId: call.callId,
name: call.tool,
success: response.success,
contentItems: response.contentItems,
});
return response as JsonValue;
} finally {
activeAppServerTurnRequests = Math.max(0, activeAppServerTurnRequests - 1);
touchTurnCompletionActivity(`request:${request.method}:response`, {
arm: armCompletionWatchOnResponse,
});
}
});
const llmInputEvent = {
@@ -638,6 +735,7 @@ export async function runCodexAppServerAttempt(
abort: () => runAbortController.abort("aborted"),
};
setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
touchTurnCompletionActivity("turn:start");
const timeout = setTimeout(
() => {
@@ -664,7 +762,11 @@ export async function runCodexAppServerAttempt(
await completion;
const result = activeProjector.buildResult(toolBridge.telemetry, { yieldDetected });
const finalAborted = result.aborted || runAbortController.signal.aborted;
const finalPromptError = timedOut ? "codex app-server attempt timed out" : result.promptError;
const finalPromptError = turnCompletionIdleTimedOut
? turnCompletionIdleTimeoutMessage
: timedOut
? "codex app-server attempt timed out"
: result.promptError;
const finalPromptErrorSource = timedOut ? "prompt" : result.promptErrorSource;
recordCodexTrajectoryCompletion(trajectoryRecorder, {
attempt: params,
@@ -787,6 +889,7 @@ export async function runCodexAppServerAttempt(
await trajectoryRecorder?.flush();
userInputBridge?.cancelPending();
clearTimeout(timeout);
clearTurnCompletionIdleTimer();
notificationCleanup();
requestCleanup();
nativeHookRelay?.unregister();
@@ -1055,6 +1158,16 @@ async function withCodexStartupTimeout<T>(params: {
}
}
function resolveCodexTurnCompletionIdleTimeoutMs(value: number | undefined): number {
if (value === undefined) {
return CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS;
}
if (!Number.isFinite(value)) {
return CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS;
}
return Math.max(1, Math.floor(value));
}
function readDynamicToolCallParams(
value: JsonValue | undefined,
): CodexDynamicToolCallParams | undefined {
@@ -1166,6 +1279,7 @@ function handleApprovalRequest(params: {
export const __testing = {
CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS,
filterToolsForVisionInputs,
handleDynamicToolCallWithTimeout,
...createCodexAppServerClientFactoryTestHooks((factory) => {