fix: bridge codex request user input

This commit is contained in:
Peter Steinberger
2026-04-24 04:40:39 +01:00
parent eb6e1245ac
commit 53aac30f51
8 changed files with 680 additions and 5 deletions

View File

@@ -79,6 +79,46 @@ describe("Codex app-server approval bridge", () => {
);
});
it("describes command approvals from parsed command actions when available", async () => {
const params = createParams();
mockCallGatewayTool.mockResolvedValueOnce({
id: "plugin:approval-actions",
decision: "allow-once",
});
await handleCodexAppServerApprovalRequest({
method: "item/commandExecution/requestApproval",
requestParams: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "cmd-actions",
command: "bash -lc 'pnpm test extensions/codex'",
commandActions: [{ command: "pnpm test extensions/codex" }],
},
paramsForRun: params,
threadId: "thread-1",
turnId: "turn-1",
});
const [, , requestPayload] = mockCallGatewayTool.mock.calls[0] ?? [];
expect(requestPayload).toEqual(
expect.objectContaining({
description: expect.stringContaining("Command: pnpm test extensions/codex"),
}),
);
expect(requestPayload).toEqual(
expect.objectContaining({
description: expect.not.stringContaining("bash -lc"),
}),
);
expect(params.onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "approval",
data: expect.objectContaining({ command: "pnpm test extensions/codex" }),
}),
);
});
it("fails closed when no approval route is available", async () => {
const params = createParams();
mockCallGatewayTool.mockResolvedValueOnce({
@@ -266,6 +306,57 @@ describe("Codex app-server approval bridge", () => {
).toEqual({
decision: "accept",
});
expect(
buildApprovalResponse(
"item/commandExecution/requestApproval",
{
availableDecisions: [
"accept",
{
acceptWithExecpolicyAmendment: {
execpolicy_amendment: {
permissions: [{ permission: "allow", command: ["pnpm", "test"] }],
},
},
},
],
},
"approved-session",
),
).toEqual({
decision: {
acceptWithExecpolicyAmendment: {
execpolicy_amendment: {
permissions: [{ permission: "allow", command: ["pnpm", "test"] }],
},
},
},
});
expect(
buildApprovalResponse(
"item/commandExecution/requestApproval",
{
availableDecisions: [
{
applyNetworkPolicyAmendment: {
network_policy_amendment: {
domain: "registry.npmjs.org",
},
},
},
],
},
"approved-session",
),
).toEqual({
decision: {
applyNetworkPolicyAmendment: {
network_policy_amendment: {
domain: "registry.npmjs.org",
},
},
},
});
expect(buildApprovalResponse("item/fileChange/requestApproval", undefined, "denied")).toEqual({
decision: "decline",
});

View File

@@ -161,7 +161,7 @@ function buildApprovalContext(params: {
readString(params.requestParams, "itemId") ??
readString(params.requestParams, "callId") ??
readString(params.requestParams, "approvalId");
const command = readCommand(params.requestParams);
const command = readDisplayCommand(params.requestParams);
const reason = readString(params.requestParams, "reason");
const kind = approvalKindForMethod(params.method);
const permissionLines =
@@ -220,8 +220,14 @@ function commandApprovalDecision(
if (outcome === "denied" || outcome === "unavailable") {
return "decline";
}
if (outcome === "approved-session" && hasAvailableDecision(requestParams, "acceptForSession")) {
return "acceptForSession";
if (outcome === "approved-session") {
if (hasAvailableDecision(requestParams, "acceptForSession")) {
return "acceptForSession";
}
const amendmentDecision = findAvailableCommandAmendmentDecision(requestParams);
if (amendmentDecision) {
return amendmentDecision;
}
}
return "accept";
}
@@ -459,6 +465,21 @@ function hasAvailableDecision(requestParams: JsonObject | undefined, decision: s
return available.includes(decision);
}
function findAvailableCommandAmendmentDecision(
requestParams: JsonObject | undefined,
): JsonValue | undefined {
const available = requestParams?.availableDecisions;
if (!Array.isArray(available)) {
return undefined;
}
return available.find(
(entry): entry is JsonObject =>
isJsonObject(entry) &&
(isJsonObject(entry.acceptWithExecpolicyAmendment) ||
isJsonObject(entry.applyNetworkPolicyAmendment)),
);
}
function approvalResolutionMessage(outcome: AppServerApprovalOutcome): string {
if (outcome === "approved-session") {
return "Codex app-server approval granted for the session.";
@@ -510,6 +531,25 @@ function emitApprovalEvent(params: EmbeddedRunAttemptParams, data: AgentApproval
params.onAgentEvent?.({ stream: "approval", data: data as unknown as Record<string, unknown> });
}
function readDisplayCommand(record: JsonObject | undefined): string | undefined {
const actionCommand = readCommandActions(record);
if (actionCommand) {
return actionCommand;
}
return readCommand(record);
}
function readCommandActions(record: JsonObject | undefined): string | undefined {
const actions = record?.commandActions;
if (!Array.isArray(actions)) {
return undefined;
}
const commands = actions
.map((action) => (isJsonObject(action) ? readString(action, "command") : undefined))
.filter((command): command is string => Boolean(command));
return commands.length > 0 ? commands.join(" && ") : undefined;
}
function readCommand(record: JsonObject | undefined): string | undefined {
const command = record?.command;
if (typeof command === "string") {

View File

@@ -653,6 +653,97 @@ describe("runCodexAppServerAttempt", () => {
await run;
});
it("routes request_user_input prompts through the active run follow-up queue", async () => {
let notify: (notification: CodexServerNotification) => Promise<void> = async () => undefined;
let handleRequest:
| ((request: { id: string; method: string; params?: unknown }) => Promise<unknown>)
| undefined;
const request = vi.fn(async (method: string) => {
if (method === "thread/start") {
return threadStartResult();
}
if (method === "turn/start") {
return turnStartResult();
}
return {};
});
__testing.setCodexAppServerClientFactoryForTests(
async () =>
({
request,
addNotificationHandler: (handler: typeof notify) => {
notify = handler;
return () => undefined;
},
addRequestHandler: (
handler: (request: {
id: string;
method: string;
params?: unknown;
}) => Promise<unknown>,
) => {
handleRequest = handler;
return () => undefined;
},
}) as never,
);
const params = createParams(
path.join(tempDir, "session.jsonl"),
path.join(tempDir, "workspace"),
);
params.onBlockReply = vi.fn();
const run = runCodexAppServerAttempt(params);
await vi.waitFor(
() => expect(request.mock.calls.some(([method]) => method === "turn/start")).toBe(true),
{ interval: 1 },
);
await vi.waitFor(() => expect(handleRequest).toBeTypeOf("function"), { interval: 1 });
const response = handleRequest?.({
id: "request-input-1",
method: "item/tool/requestUserInput",
params: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "ask-1",
questions: [
{
id: "mode",
header: "Mode",
question: "Pick a mode",
isOther: false,
isSecret: false,
options: [
{ label: "Fast", description: "Use less reasoning" },
{ label: "Deep", description: "Use more reasoning" },
],
},
],
},
});
await vi.waitFor(() => expect(params.onBlockReply).toHaveBeenCalledTimes(1), { interval: 1 });
expect(queueAgentHarnessMessage("session-1", "2")).toBe(true);
await expect(response).resolves.toEqual({
answers: { mode: { answers: ["Deep"] } },
});
expect(request).not.toHaveBeenCalledWith(
"turn/steer",
expect.objectContaining({ expectedTurnId: "turn-1" }),
);
await notify({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: { id: "turn-1", status: "completed" },
},
});
await run;
});
it("does not leak unhandled rejections when shutdown closes before interrupt", async () => {
const unhandledRejections: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {

View File

@@ -58,6 +58,7 @@ import {
recordCodexTrajectoryContext,
} from "./trajectory.js";
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
import { createCodexUserInputBridge } from "./user-input-bridge.js";
import { filterToolsForVisionInputs } from "./vision-tools.js";
let clientFactory = defaultCodexAppServerClientFactory;
@@ -211,6 +212,7 @@ export async function runCodexAppServerAttempt(
let projector: CodexAppServerEventProjector | undefined;
let turnId: string | undefined;
const pendingNotifications: CodexServerNotification[] = [];
let userInputBridge: ReturnType<typeof createCodexUserInputBridge> | undefined;
let completed = false;
let timedOut = false;
let resolveCompletion: (() => void) | undefined;
@@ -220,6 +222,7 @@ export async function runCodexAppServerAttempt(
let notificationQueue: Promise<void> = Promise.resolve();
const handleNotification = async (notification: CodexServerNotification) => {
userInputBridge?.handleNotification(notification);
if (!projector || !turnId) {
pendingNotifications.push(notification);
return;
@@ -266,6 +269,12 @@ export async function runCodexAppServerAttempt(
signal: runAbortController.signal,
});
}
if (request.method === "item/tool/requestUserInput") {
return userInputBridge?.handleRequest({
id: request.id,
params: request.params,
});
}
if (request.method !== "item/tool/call") {
if (isCodexAppServerApprovalRequest(request.method)) {
return handleApprovalRequest({
@@ -382,6 +391,12 @@ export async function runCodexAppServerAttempt(
}
turnId = turn.turn.id;
const activeTurnId = turn.turn.id;
userInputBridge = createCodexUserInputBridge({
paramsForRun: params,
threadId: thread.threadId,
turnId: activeTurnId,
signal: runAbortController.signal,
});
trajectoryRecorder?.recordEvent("prompt.submitted", {
threadId: thread.threadId,
turnId: activeTurnId,
@@ -407,6 +422,9 @@ export async function runCodexAppServerAttempt(
const handle = {
kind: "embedded" as const,
queueMessage: async (text: string) => {
if (userInputBridge?.handleQueuedMessage(text)) {
return;
}
await client.request("turn/steer", {
threadId: thread.threadId,
expectedTurnId: activeTurnId,
@@ -511,6 +529,7 @@ export async function runCodexAppServerAttempt(
});
}
await trajectoryRecorder?.flush();
userInputBridge?.cancelPending();
clearTimeout(timeout);
notificationCleanup();
requestCleanup();

View File

@@ -0,0 +1,137 @@
import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness-runtime";
import { describe, expect, it, vi } from "vitest";
import { createCodexUserInputBridge } from "./user-input-bridge.js";
function createParams(): EmbeddedRunAttemptParams {
return {
sessionId: "session-1",
sessionKey: "agent:main:session-1",
onBlockReply: vi.fn(),
} as unknown as EmbeddedRunAttemptParams;
}
describe("Codex app-server user input bridge", () => {
it("prompts the originating chat and resolves request_user_input from the next queued message", async () => {
const params = createParams();
const bridge = createCodexUserInputBridge({
paramsForRun: params,
threadId: "thread-1",
turnId: "turn-1",
});
const response = bridge.handleRequest({
id: "input-1",
params: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "tool-1",
questions: [
{
id: "choice",
header: "Mode",
question: "Pick a mode",
isOther: false,
isSecret: false,
options: [
{ label: "Fast", description: "Use less reasoning" },
{ label: "Deep", description: "Use more reasoning" },
],
},
],
},
});
await vi.waitFor(() => expect(params.onBlockReply).toHaveBeenCalledTimes(1));
expect(params.onBlockReply).toHaveBeenCalledWith({
text: expect.stringContaining("Pick a mode"),
});
expect(bridge.handleQueuedMessage("2")).toBe(true);
await expect(response).resolves.toEqual({
answers: { choice: { answers: ["Deep"] } },
});
});
it("maps keyed multi-question replies to Codex answer ids", async () => {
const params = createParams();
const bridge = createCodexUserInputBridge({
paramsForRun: params,
threadId: "thread-1",
turnId: "turn-1",
});
const response = bridge.handleRequest({
id: "input-2",
params: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "tool-1",
questions: [
{
id: "repo",
header: "Repository",
question: "Which repo?",
isOther: true,
isSecret: false,
options: null,
},
{
id: "scope",
header: "Scope",
question: "Which scope?",
isOther: false,
isSecret: false,
options: [{ label: "Tests", description: "Only tests" }],
},
],
},
});
await vi.waitFor(() => expect(params.onBlockReply).toHaveBeenCalledTimes(1));
expect(bridge.handleQueuedMessage("repo: openclaw\nscope: Tests")).toBe(true);
await expect(response).resolves.toEqual({
answers: {
repo: { answers: ["openclaw"] },
scope: { answers: ["Tests"] },
},
});
});
it("clears pending prompts when Codex resolves the server request itself", async () => {
const params = createParams();
const bridge = createCodexUserInputBridge({
paramsForRun: params,
threadId: "thread-1",
turnId: "turn-1",
});
const response = bridge.handleRequest({
id: "input-3",
params: {
threadId: "thread-1",
turnId: "turn-1",
itemId: "tool-1",
questions: [
{
id: "answer",
header: "Answer",
question: "Continue?",
isOther: true,
isSecret: false,
options: null,
},
],
},
});
await vi.waitFor(() => expect(params.onBlockReply).toHaveBeenCalledTimes(1));
bridge.handleNotification({
method: "serverRequest/resolved",
params: { threadId: "thread-1", requestId: "input-3" },
});
await expect(response).resolves.toEqual({ answers: {} });
expect(bridge.handleQueuedMessage("too late")).toBe(false);
});
});

View File

@@ -0,0 +1,294 @@
import {
embeddedAgentLog,
type EmbeddedRunAttemptParams,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import {
isJsonObject,
type CodexServerNotification,
type JsonObject,
type JsonValue,
} from "./protocol.js";
type PendingUserInput = {
requestId: number | string;
threadId: string;
turnId: string;
itemId: string;
questions: UserInputQuestion[];
resolve: (value: JsonValue) => void;
cleanup: () => void;
};
type UserInputQuestion = {
id: string;
header: string;
question: string;
isOther: boolean;
isSecret: boolean;
options: UserInputOption[] | null;
};
type UserInputOption = {
label: string;
description: string;
};
export type CodexUserInputBridge = {
handleRequest: (request: {
id: number | string;
params?: JsonValue;
}) => Promise<JsonValue | undefined>;
handleQueuedMessage: (text: string) => boolean;
handleNotification: (notification: CodexServerNotification) => void;
cancelPending: () => void;
};
export function createCodexUserInputBridge(params: {
paramsForRun: EmbeddedRunAttemptParams;
threadId: string;
turnId: string;
signal?: AbortSignal;
}): CodexUserInputBridge {
let pending: PendingUserInput | undefined;
const resolvePending = (value: JsonValue) => {
const current = pending;
if (!current) {
return;
}
pending = undefined;
current.cleanup();
current.resolve(value);
};
return {
async handleRequest(request) {
const requestParams = readUserInputParams(request.params);
if (!requestParams) {
return undefined;
}
if (requestParams.threadId !== params.threadId || requestParams.turnId !== params.turnId) {
return undefined;
}
resolvePending(emptyUserInputResponse());
return new Promise<JsonValue>((resolve) => {
const abortListener = () => resolvePending(emptyUserInputResponse());
const cleanup = () => params.signal?.removeEventListener("abort", abortListener);
pending = {
requestId: request.id,
threadId: requestParams.threadId,
turnId: requestParams.turnId,
itemId: requestParams.itemId,
questions: requestParams.questions,
resolve,
cleanup,
};
params.signal?.addEventListener("abort", abortListener, { once: true });
if (params.signal?.aborted) {
resolvePending(emptyUserInputResponse());
return;
}
void deliverUserInputPrompt(params.paramsForRun, requestParams.questions).catch((error) => {
embeddedAgentLog.warn("failed to deliver codex user input prompt", { error });
});
});
},
handleQueuedMessage(text) {
const current = pending;
if (!current) {
return false;
}
resolvePending(buildUserInputResponse(current.questions, text));
return true;
},
handleNotification(notification) {
if (notification.method !== "serverRequest/resolved" || !pending) {
return;
}
const notificationParams = isJsonObject(notification.params)
? notification.params
: undefined;
const requestId = notificationParams ? readRequestId(notificationParams) : undefined;
if (
notificationParams &&
readString(notificationParams, "threadId") === pending.threadId &&
requestId !== undefined &&
String(requestId) === String(pending.requestId)
) {
resolvePending(emptyUserInputResponse());
}
},
cancelPending() {
resolvePending(emptyUserInputResponse());
},
};
}
function readUserInputParams(value: JsonValue | undefined):
| {
threadId: string;
turnId: string;
itemId: string;
questions: UserInputQuestion[];
}
| undefined {
if (!isJsonObject(value)) {
return undefined;
}
const threadId = readString(value, "threadId");
const turnId = readString(value, "turnId");
const itemId = readString(value, "itemId");
const questionsRaw = value.questions;
if (!threadId || !turnId || !itemId || !Array.isArray(questionsRaw)) {
return undefined;
}
const questions = questionsRaw
.map(readQuestion)
.filter((question): question is UserInputQuestion => Boolean(question));
return { threadId, turnId, itemId, questions };
}
function readQuestion(value: JsonValue): UserInputQuestion | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const id = readString(value, "id");
const header = readString(value, "header");
const question = readString(value, "question");
if (!id || !header || !question) {
return undefined;
}
return {
id,
header,
question,
isOther: value.isOther === true,
isSecret: value.isSecret === true,
options: readOptions(value.options),
};
}
function readOptions(value: JsonValue | undefined): UserInputOption[] | null {
if (!Array.isArray(value)) {
return null;
}
const options = value
.map(readOption)
.filter((option): option is UserInputOption => Boolean(option));
return options.length > 0 ? options : null;
}
function readOption(value: JsonValue): UserInputOption | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const label = readString(value, "label");
const description = readString(value, "description") ?? "";
return label ? { label, description } : undefined;
}
async function deliverUserInputPrompt(
params: EmbeddedRunAttemptParams,
questions: UserInputQuestion[],
): Promise<void> {
const text = formatUserInputPrompt(questions);
if (params.onBlockReply) {
await params.onBlockReply({ text });
return;
}
await params.onPartialReply?.({ text });
}
function formatUserInputPrompt(questions: UserInputQuestion[]): string {
const lines = ["Codex needs input:"];
questions.forEach((question, index) => {
if (questions.length > 1) {
lines.push("", `${index + 1}. ${question.header}`, question.question);
} else {
lines.push("", question.header, question.question);
}
if (question.isSecret) {
lines.push("This channel may show your reply to other participants.");
}
question.options?.forEach((option, optionIndex) => {
lines.push(
`${optionIndex + 1}. ${option.label}${option.description ? ` - ${option.description}` : ""}`,
);
});
if (question.isOther) {
lines.push("Other: reply with your own answer.");
}
});
return lines.join("\n");
}
function buildUserInputResponse(questions: UserInputQuestion[], inputText: string): JsonObject {
const answers: JsonObject = {};
if (questions.length === 1) {
const question = questions[0];
if (question) {
answers[question.id] = { answers: [normalizeAnswer(inputText, question)] };
}
return { answers };
}
const keyed = parseKeyedAnswers(inputText);
const fallbackLines = inputText
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean);
questions.forEach((question, index) => {
const key =
keyed.get(question.id.toLowerCase()) ??
keyed.get(question.header.toLowerCase()) ??
keyed.get(question.question.toLowerCase()) ??
keyed.get(String(index + 1));
const answer = key ?? fallbackLines[index] ?? "";
answers[question.id] = { answers: answer ? [normalizeAnswer(answer, question)] : [] };
});
return { answers };
}
function normalizeAnswer(answer: string, question: UserInputQuestion): string {
const trimmed = answer.trim();
const options = question.options ?? [];
const optionIndex = /^\d+$/.test(trimmed) ? Number(trimmed) - 1 : -1;
const indexed = optionIndex >= 0 ? options[optionIndex] : undefined;
if (indexed) {
return indexed.label;
}
const exact = options.find((option) => option.label.toLowerCase() === trimmed.toLowerCase());
return exact?.label ?? trimmed;
}
function parseKeyedAnswers(inputText: string): Map<string, string> {
const answers = new Map<string, string>();
for (const line of inputText.split(/\r?\n/)) {
const match = line.match(/^\s*([^:=-]+?)\s*[:=-]\s*(.+?)\s*$/);
if (!match) {
continue;
}
const key = match[1]?.trim().toLowerCase();
const value = match[2]?.trim();
if (key && value) {
answers.set(key, value);
}
}
return answers;
}
function emptyUserInputResponse(): JsonObject {
return { answers: {} };
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}
function readRequestId(record: JsonObject): string | number | undefined {
const value = record.requestId;
return typeof value === "string" || typeof value === "number" ? value : undefined;
}