fix(codex): flush pending steering on completion

This commit is contained in:
Peter Steinberger
2026-04-30 03:06:01 +01:00
parent 1a103088ba
commit 58153d38af
3 changed files with 40 additions and 3 deletions

View File

@@ -1089,6 +1089,31 @@ describe("runCodexAppServerAttempt", () => {
await run;
});
it("flushes pending default queued steering during normal turn cleanup", async () => {
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "late steer", { debounceMs: 30_000 })).toBe(true);
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([
{
method: "turn/steer",
params: {
threadId: "thread-1",
expectedTurnId: "turn-1",
input: [{ type: "text", text: "late steer", text_elements: [] }],
},
},
]);
});
it("keeps legacy queue steering as separate turn/steer requests", async () => {
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();

View File

@@ -193,6 +193,9 @@ function createCodexSteeringQueue(params: {
void flushBatch();
}, debounceMs);
},
async flushPending() {
await flushBatch();
},
cancel() {
clearBatchTimer();
batchedTexts = [];
@@ -453,6 +456,7 @@ export async function runCodexAppServerAttempt(
let turnId: string | undefined;
const pendingNotifications: CodexServerNotification[] = [];
let userInputBridge: ReturnType<typeof createCodexUserInputBridge> | undefined;
let steeringQueue: ReturnType<typeof createCodexSteeringQueue> | undefined;
let completed = false;
let timedOut = false;
let turnCompletionIdleTimedOut = false;
@@ -586,6 +590,9 @@ export async function runCodexAppServerAttempt(
});
} finally {
if (isTurnCompletion) {
if (!timedOut && !runAbortController.signal.aborted) {
await steeringQueue?.flushPending();
}
completed = true;
clearTurnCompletionIdleTimer();
resolveCompletion?.();
@@ -814,17 +821,18 @@ export async function runCodexAppServerAttempt(
});
}
const steeringQueue = createCodexSteeringQueue({
const activeSteeringQueue = createCodexSteeringQueue({
client,
threadId: thread.threadId,
turnId: activeTurnId,
answerPendingUserInput: (text) => userInputBridge?.handleQueuedMessage(text) ?? false,
signal: runAbortController.signal,
});
steeringQueue = activeSteeringQueue;
const handle = {
kind: "embedded" as const,
queueMessage: async (text: string, options?: CodexSteeringQueueOptions) =>
steeringQueue.queue(text, options),
activeSteeringQueue.queue(text, options),
isStreaming: () => !completed,
isCompacting: () => projector?.isCompacting() ?? false,
cancel: () => runAbortController.abort("cancelled"),
@@ -991,6 +999,9 @@ export async function runCodexAppServerAttempt(
await trajectoryRecorder?.flush();
},
});
if (!timedOut && !runAbortController.signal.aborted) {
await steeringQueue?.flushPending();
}
userInputBridge?.cancelPending();
clearTimeout(timeout);
clearTurnCompletionIdleTimer();
@@ -999,7 +1010,7 @@ export async function runCodexAppServerAttempt(
nativeHookRelay?.unregister();
runAbortController.signal.removeEventListener("abort", abortListener);
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
steeringQueue.cancel();
steeringQueue?.cancel();
clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
}
}