feat: default active steering to batched delivery

This commit is contained in:
Peter Steinberger
2026-04-30 01:22:20 +01:00
parent fabfab2b84
commit 30a2b3049a
39 changed files with 520 additions and 66 deletions

View File

@@ -1056,6 +1056,81 @@ describe("runCodexAppServerAttempt", () => {
);
});
it("batches default queued steering before sending turn/steer", async () => {
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "first", { debounceMs: 5 })).toBe(true);
expect(queueAgentHarnessMessage("session-1", "second", { debounceMs: 5 })).toBe(true);
await vi.waitFor(
() =>
expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([
{
method: "turn/steer",
params: {
threadId: "thread-1",
expectedTurnId: "turn-1",
input: [
{ type: "text", text: "first", text_elements: [] },
{ type: "text", text: "second", text_elements: [] },
],
},
},
]),
{ interval: 1 },
);
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
});
it("keeps legacy queue steering as separate turn/steer requests", async () => {
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "first", { steeringMode: "one-at-a-time" })).toBe(
true,
);
expect(queueAgentHarnessMessage("session-1", "second", { steeringMode: "one-at-a-time" })).toBe(
true,
);
await vi.waitFor(
() =>
expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([
{
method: "turn/steer",
params: {
threadId: "thread-1",
expectedTurnId: "turn-1",
input: [{ type: "text", text: "first", text_elements: [] }],
},
},
{
method: "turn/steer",
params: {
threadId: "thread-1",
expectedTurnId: "turn-1",
input: [{ type: "text", text: "second", text_elements: [] }],
},
},
]),
{ interval: 1 },
);
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
});
it("routes MCP approval elicitations through the native bridge", async () => {
let notify: (notification: CodexServerNotification) => Promise<void> = async () => undefined;
let handleRequest:

View File

@@ -59,6 +59,7 @@ import {
readCodexDynamicToolCallParams,
} from "./protocol-validators.js";
import {
type CodexUserInput,
isJsonObject,
type CodexServerNotification,
type CodexDynamicToolCallParams,
@@ -86,6 +87,7 @@ import { filterToolsForVisionInputs } from "./vision-tools.js";
const CODEX_DYNAMIC_TOOL_TIMEOUT_MS = 30_000;
const CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS = 60_000;
const CODEX_STEER_ALL_DEBOUNCE_MS = 500;
type OpenClawCodingToolsOptions = NonNullable<
Parameters<(typeof import("openclaw/plugin-sdk/agent-harness"))["createOpenClawCodingTools"]>[0]
@@ -123,6 +125,91 @@ function collectTerminalAssistantText(result: EmbeddedRunAttemptResult): string
return result.assistantTexts.join("\n\n").trim();
}
type CodexSteeringQueueOptions = {
steeringMode?: "all" | "one-at-a-time";
debounceMs?: number;
};
function createCodexSteeringQueue(params: {
client: CodexAppServerClient;
threadId: string;
turnId: string;
answerPendingUserInput: (text: string) => boolean;
signal: AbortSignal;
}) {
let batchedTexts: string[] = [];
let batchTimer: NodeJS.Timeout | undefined;
let sendChain: Promise<void> = Promise.resolve();
const clearBatchTimer = () => {
if (batchTimer) {
clearTimeout(batchTimer);
batchTimer = undefined;
}
};
const sendTexts = async (texts: string[]) => {
if (texts.length === 0 || params.signal.aborted) {
return;
}
await params.client.request("turn/steer", {
threadId: params.threadId,
expectedTurnId: params.turnId,
input: texts.map(toCodexTextInput),
});
};
const enqueueSend = (texts: string[]) => {
sendChain = sendChain
.then(() => sendTexts(texts))
.catch((error: unknown) => {
embeddedAgentLog.debug("codex app-server queued steer failed", { error });
});
return sendChain;
};
const flushBatch = () => {
clearBatchTimer();
const texts = batchedTexts;
batchedTexts = [];
return enqueueSend(texts);
};
return {
async queue(text: string, options?: CodexSteeringQueueOptions) {
if (params.answerPendingUserInput(text)) {
return;
}
if (options?.steeringMode === "one-at-a-time") {
await flushBatch();
await enqueueSend([text]);
return;
}
batchedTexts.push(text);
clearBatchTimer();
const debounceMs = normalizeCodexSteerDebounceMs(options?.debounceMs);
batchTimer = setTimeout(() => {
batchTimer = undefined;
void flushBatch();
}, debounceMs);
},
cancel() {
clearBatchTimer();
batchedTexts = [];
},
};
}
function normalizeCodexSteerDebounceMs(value: number | undefined): number {
return typeof value === "number" && Number.isFinite(value) && value >= 0
? Math.floor(value)
: CODEX_STEER_ALL_DEBOUNCE_MS;
}
function toCodexTextInput(text: string): CodexUserInput {
return { type: "text", text, text_elements: [] };
}
export async function runCodexAppServerAttempt(
params: EmbeddedRunAttemptParams,
options: {
@@ -727,18 +814,17 @@ export async function runCodexAppServerAttempt(
});
}
const steeringQueue = createCodexSteeringQueue({
client,
threadId: thread.threadId,
turnId: activeTurnId,
answerPendingUserInput: (text) => userInputBridge?.handleQueuedMessage(text) ?? false,
signal: runAbortController.signal,
});
const handle = {
kind: "embedded" as const,
queueMessage: async (text: string) => {
if (userInputBridge?.handleQueuedMessage(text)) {
return;
}
await client.request("turn/steer", {
threadId: thread.threadId,
expectedTurnId: activeTurnId,
input: [{ type: "text", text, text_elements: [] }],
});
},
queueMessage: async (text: string, options?: CodexSteeringQueueOptions) =>
steeringQueue.queue(text, options),
isStreaming: () => !completed,
isCompacting: () => projector?.isCompacting() ?? false,
cancel: () => runAbortController.abort("cancelled"),
@@ -913,6 +999,7 @@ export async function runCodexAppServerAttempt(
nativeHookRelay?.unregister();
runAbortController.signal.removeEventListener("abort", abortListener);
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
steeringQueue.cancel();
clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
}
}