fix(codex): route btw through native side threads

This commit is contained in:
pashpashpash
2026-05-10 18:24:55 -07:00
committed by Peter Steinberger
parent 4aa13b5338
commit 42e259a696
11 changed files with 1018 additions and 13 deletions

View File

@@ -23,7 +23,7 @@ When you send:
OpenClaw:
1. snapshots the current session context,
2. runs a separate **tool-less** model call,
2. runs a separate ephemeral side query,
3. answers only the side question,
4. leaves the main run alone,
5. does **not** write the BTW question or answer to session history,
@@ -33,17 +33,22 @@ The important mental model is:
- same session context
- separate one-shot side query
- no tool calls
- same native harness transport when the session uses a native harness
- no future context pollution
- no transcript persistence
For Codex harness sessions, BTW stays inside Codex by forking the active
app-server thread as an ephemeral side thread, matching Codex `/side`
semantics. That keeps Codex OAuth, native transport behavior, and Codex's
workspace/tool machinery intact while still isolating the side answer from the
parent transcript. Non-Codex runtimes keep the older direct one-shot path.
## What it does not do
`/btw` does **not**:
- create a new durable session,
- continue the unfinished main task,
- run tools or agent tool loops,
- write BTW question/answer data to transcript history,
- appear in `chat.history`,
- survive a reload.
@@ -60,7 +65,7 @@ explicitly telling the model:
- answer only the side question,
- do not resume or complete the unfinished main task,
- do not emit tool calls or pseudo-tool calls.
- do not steer the parent conversation.
That keeps BTW isolated from the main run while still making it aware of what
the session is about.

View File

@@ -40,6 +40,10 @@ export function createCodexAppServerAgentHarness(options?: {
const { runCodexAppServerAttempt } = await import("./src/app-server/run-attempt.js");
return runCodexAppServerAttempt(params, { pluginConfig: options?.pluginConfig });
},
runSideQuestion: async (params) => {
const { runCodexAppServerSideQuestion } = await import("./src/app-server/side-question.js");
return runCodexAppServerSideQuestion(params, { pluginConfig: options?.pluginConfig });
},
compact: async (params) => {
const { maybeCompactCodexAppServerSession } = await import("./src/app-server/compact.js");
return maybeCompactCodexAppServerSession(params, { pluginConfig: options?.pluginConfig });

View File

@@ -10,6 +10,7 @@ import type {
CodexDynamicToolCallParams,
CodexErrorNotification,
CodexModelListResponse,
CodexThreadForkResponse,
CodexThreadResumeResponse,
CodexThreadStartResponse,
CodexTurn,
@@ -50,6 +51,14 @@ export function assertCodexThreadStartResponse(value: unknown): CodexThreadStart
);
}
export function assertCodexThreadForkResponse(value: unknown): CodexThreadForkResponse {
return assertCodexShape(
validateThreadStartResponse,
normalizeThreadResponse(value),
"thread/fork response",
);
}
export function assertCodexThreadResumeResponse(value: unknown): CodexThreadResumeResponse {
return assertCodexShape(
validateThreadResumeResponse,

View File

@@ -97,12 +97,36 @@ export type CodexThreadStartResponse = {
modelProvider?: string | null;
};
export type CodexThreadForkParams = CodexThreadStartParams & {
threadId: string;
baseInstructions?: string;
ephemeral?: boolean;
threadSource?: string | JsonObject;
excludeTurns?: boolean;
};
export type CodexThreadForkResponse = CodexThreadStartResponse;
export type CodexThreadResumeResponse = {
thread: CodexThread;
model: string;
modelProvider?: string | null;
};
export type CodexThreadInjectItemsParams = JsonObject & {
threadId: string;
items: JsonValue[];
};
export type CodexThreadUnsubscribeParams = JsonObject & {
threadId: string;
};
export type CodexTurnInterruptParams = JsonObject & {
threadId: string;
turnId: string;
};
export type CodexTurnStartParams = JsonObject & {
threadId: string;
input?: CodexUserInput[];
@@ -401,7 +425,11 @@ export declare namespace v2 {
}
type CodexAppServerRequestParamsOverride = {
"thread/fork": CodexThreadForkParams;
"thread/inject_items": CodexThreadInjectItemsParams;
"thread/start": CodexThreadStartParams;
"thread/unsubscribe": CodexThreadUnsubscribeParams;
"turn/interrupt": CodexTurnInterruptParams;
};
type CodexAppServerRequestResultMap = {
@@ -422,9 +450,12 @@ type CodexAppServerRequestResultMap = {
"review/start": JsonValue;
"skills/list": CodexSkillsListResponse;
"thread/compact/start": JsonValue;
"thread/fork": CodexThreadForkResponse;
"thread/inject_items": JsonValue;
"thread/list": JsonValue;
"thread/resume": CodexThreadResumeResponse;
"thread/start": CodexThreadStartResponse;
"thread/unsubscribe": JsonValue;
"turn/interrupt": JsonValue;
"turn/start": CodexTurnStartResponse;
"turn/steer": JsonValue;

View File

@@ -0,0 +1,359 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CodexServerNotification, RpcRequest } from "./protocol.js";
const readCodexAppServerBindingMock = vi.fn();
const isCodexAppServerNativeAuthProfileMock = vi.fn();
const getSharedCodexAppServerClientMock = vi.fn();
const refreshCodexAppServerAuthTokensMock = vi.fn();
vi.mock("./session-binding.js", () => ({
clearCodexAppServerBinding: vi.fn(),
isCodexAppServerNativeAuthProfile: (...args: unknown[]) =>
isCodexAppServerNativeAuthProfileMock(...args),
readCodexAppServerBinding: (...args: unknown[]) => readCodexAppServerBindingMock(...args),
writeCodexAppServerBinding: vi.fn(),
}));
vi.mock("./shared-client.js", () => ({
getSharedCodexAppServerClient: (...args: unknown[]) => getSharedCodexAppServerClientMock(...args),
}));
vi.mock("./auth-bridge.js", () => ({
refreshCodexAppServerAuthTokens: (...args: unknown[]) =>
refreshCodexAppServerAuthTokensMock(...args),
}));
const { runCodexAppServerSideQuestion } = await import("./side-question.js");
type ServerRequest = Required<Pick<RpcRequest, "id" | "method">> & {
params?: RpcRequest["params"];
};
type FakeClient = {
request: ReturnType<typeof vi.fn>;
addNotificationHandler: ReturnType<typeof vi.fn>;
addRequestHandler: ReturnType<typeof vi.fn>;
notifications: Array<(notification: CodexServerNotification) => void>;
requests: Array<(request: ServerRequest) => unknown>;
emit: (notification: CodexServerNotification) => void;
};
function createFakeClient(): FakeClient {
const notifications: FakeClient["notifications"] = [];
const requests: FakeClient["requests"] = [];
const client: FakeClient = {
notifications,
requests,
request: vi.fn(),
addNotificationHandler: vi.fn((handler: (notification: CodexServerNotification) => void) => {
notifications.push(handler);
return () => {
const index = notifications.indexOf(handler);
if (index >= 0) {
notifications.splice(index, 1);
}
};
}),
addRequestHandler: vi.fn((handler: FakeClient["requests"][number]) => {
requests.push(handler);
return () => {
const index = requests.indexOf(handler);
if (index >= 0) {
requests.splice(index, 1);
}
};
}),
emit: (notification) => {
for (const handler of notifications) {
handler(notification);
}
},
};
client.request.mockImplementation(async (method: string) => {
if (method === "thread/fork") {
return threadResult("side-thread");
}
if (method === "thread/inject_items") {
return {};
}
if (method === "turn/start") {
queueMicrotask(() => {
client.emit(agentDelta("side-thread", "turn-1", "Side answer."));
client.emit(turnCompleted("side-thread", "turn-1", "Side answer."));
});
return turnStartResult("turn-1");
}
if (method === "thread/unsubscribe" || method === "turn/interrupt") {
return {};
}
throw new Error(`unexpected request: ${method}`);
});
return client;
}
function threadResult(threadId: string) {
return {
thread: {
id: threadId,
sessionId: threadId,
forkedFromId: null,
preview: "",
ephemeral: true,
modelProvider: "openai",
createdAt: 1,
updatedAt: 1,
status: { type: "idle" },
path: null,
cwd: "/tmp/workspace",
cliVersion: "0.125.0",
source: "unknown",
agentNickname: null,
agentRole: null,
gitInfo: null,
name: null,
turns: [],
},
model: "gpt-5.5",
modelProvider: "openai",
cwd: "/tmp/workspace",
approvalPolicy: "never",
approvalsReviewer: "user",
sandbox: { type: "dangerFullAccess" },
};
}
function turnStartResult(turnId: string) {
return {
turn: {
id: turnId,
threadId: "side-thread",
status: "inProgress",
items: [],
error: null,
startedAt: null,
completedAt: null,
durationMs: null,
},
};
}
function agentDelta(threadId: string, turnId: string, delta: string): CodexServerNotification {
return {
method: "item/agentMessage/delta",
params: { threadId, turnId, itemId: "agent-1", delta },
};
}
function turnCompleted(threadId: string, turnId: string, text: string): CodexServerNotification {
return {
method: "turn/completed",
params: {
threadId,
turn: {
id: turnId,
threadId,
status: "completed",
items: [{ id: "agent-1", type: "agentMessage", text }],
error: null,
startedAt: null,
completedAt: null,
durationMs: null,
},
},
};
}
function sideParams(overrides: Partial<Parameters<typeof runCodexAppServerSideQuestion>[0]> = {}) {
return {
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "openai",
model: "gpt-5.5",
question: "What changed?",
sessionEntry: {
sessionId: "session-1",
sessionFile: "/tmp/session-1.jsonl",
updatedAt: 1,
},
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
sessionId: "session-1",
sessionFile: "/tmp/session-1.jsonl",
workspaceDir: "/tmp/workspace",
authProfileId: "openai-codex:work",
authProfileIdSource: "user",
...overrides,
} satisfies Parameters<typeof runCodexAppServerSideQuestion>[0];
}
describe("runCodexAppServerSideQuestion", () => {
beforeEach(() => {
readCodexAppServerBindingMock.mockReset();
isCodexAppServerNativeAuthProfileMock.mockReset();
getSharedCodexAppServerClientMock.mockReset();
refreshCodexAppServerAuthTokensMock.mockReset();
readCodexAppServerBindingMock.mockResolvedValue({
schemaVersion: 1,
threadId: "parent-thread",
sessionFile: "/tmp/session-1.jsonl",
cwd: "/tmp/workspace",
authProfileId: "openai-codex:work",
model: "gpt-5.5",
createdAt: new Date(0).toISOString(),
updatedAt: new Date(0).toISOString(),
});
isCodexAppServerNativeAuthProfileMock.mockReturnValue(true);
getSharedCodexAppServerClientMock.mockResolvedValue(createFakeClient());
refreshCodexAppServerAuthTokensMock.mockResolvedValue({
accessToken: "access-token",
chatgptAccountId: "account-1",
chatgptPlanType: "plus",
});
});
it("forks an ephemeral side thread and returns the completed assistant text", async () => {
const client = createFakeClient();
getSharedCodexAppServerClientMock.mockResolvedValue(client);
const result = await runCodexAppServerSideQuestion(sideParams());
expect(result).toEqual({ text: "Side answer." });
expect(client.request).toHaveBeenNthCalledWith(
1,
"thread/fork",
expect.objectContaining({
threadId: "parent-thread",
model: "gpt-5.5",
ephemeral: true,
threadSource: "user",
persistExtendedHistory: false,
}),
expect.any(Object),
);
expect(client.request.mock.calls[0]?.[1]).not.toHaveProperty("modelProvider");
expect(client.request).toHaveBeenNthCalledWith(
2,
"thread/inject_items",
expect.objectContaining({
threadId: "side-thread",
items: [expect.objectContaining({ type: "message", role: "user" })],
}),
expect.any(Object),
);
expect(client.request).toHaveBeenCalledWith(
"turn/start",
expect.objectContaining({
threadId: "side-thread",
input: [{ type: "text", text: "What changed?", text_elements: [] }],
model: "gpt-5.5",
}),
expect.any(Object),
);
expect(client.request).toHaveBeenLastCalledWith(
"thread/unsubscribe",
{ threadId: "side-thread" },
expect.any(Object),
);
expect(client.request).not.toHaveBeenCalledWith(
"turn/interrupt",
expect.anything(),
expect.anything(),
);
});
it("uses the app-server auth refresh request handler while the side thread is active", async () => {
const client = createFakeClient();
client.request.mockImplementation(async (method: string) => {
if (method === "thread/fork") {
await client.requests[0]?.({
id: 1,
method: "account/chatgptAuthTokens/refresh",
});
return threadResult("side-thread");
}
if (method === "thread/inject_items") {
return {};
}
if (method === "turn/start") {
queueMicrotask(() => client.emit(turnCompleted("side-thread", "turn-1", "Done.")));
return turnStartResult("turn-1");
}
return {};
});
getSharedCodexAppServerClientMock.mockResolvedValue(client);
await runCodexAppServerSideQuestion(sideParams());
expect(refreshCodexAppServerAuthTokensMock).toHaveBeenCalledWith({
agentDir: "/tmp/agent",
authProfileId: "openai-codex:work",
config: {},
});
});
it("returns a clear setup error when there is no Codex parent thread", async () => {
readCodexAppServerBindingMock.mockResolvedValue(undefined);
await expect(runCodexAppServerSideQuestion(sideParams())).rejects.toThrow(
"Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.",
);
expect(getSharedCodexAppServerClientMock).not.toHaveBeenCalled();
});
it("returns the same setup error when the persisted parent binding is stale", async () => {
const client = createFakeClient();
client.request.mockImplementation(async (method: string) => {
if (method === "thread/fork") {
throw new Error("thread/fork failed: no rollout found for thread id parent-thread");
}
return {};
});
getSharedCodexAppServerClientMock.mockResolvedValue(client);
await expect(runCodexAppServerSideQuestion(sideParams())).rejects.toThrow(
"Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.",
);
});
it("interrupts and unsubscribes the ephemeral thread on abort", async () => {
const controller = new AbortController();
const client = createFakeClient();
client.request.mockImplementation(async (method: string) => {
if (method === "thread/fork") {
return threadResult("side-thread");
}
if (method === "thread/inject_items") {
return {};
}
if (method === "turn/start") {
queueMicrotask(() => controller.abort());
return turnStartResult("turn-1");
}
if (method === "turn/interrupt" || method === "thread/unsubscribe") {
return {};
}
throw new Error(`unexpected request: ${method}`);
});
getSharedCodexAppServerClientMock.mockResolvedValue(client);
await expect(
runCodexAppServerSideQuestion(
sideParams({
opts: { abortSignal: controller.signal },
}),
),
).rejects.toThrow("Codex /btw was aborted.");
expect(client.request).toHaveBeenCalledWith(
"turn/interrupt",
{ threadId: "side-thread", turnId: "turn-1" },
expect.any(Object),
);
expect(client.request).toHaveBeenCalledWith(
"thread/unsubscribe",
{ threadId: "side-thread" },
expect.any(Object),
);
});
});

View File

@@ -0,0 +1,473 @@
import {
embeddedAgentLog,
formatErrorMessage,
type AgentHarnessSideQuestionParams,
type AgentHarnessSideQuestionResult,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import { refreshCodexAppServerAuthTokens } from "./auth-bridge.js";
import { type CodexAppServerClient } from "./client.js";
import {
codexSandboxPolicyForTurn,
readCodexPluginConfig,
resolveCodexAppServerRuntimeOptions,
} from "./config.js";
import {
assertCodexThreadForkResponse,
assertCodexTurnStartResponse,
readCodexTurnCompletedNotification,
} from "./protocol-validators.js";
import {
isJsonObject,
type CodexServerNotification,
type CodexThreadForkParams,
type CodexTurn,
type JsonObject,
type JsonValue,
} from "./protocol.js";
import { rememberCodexRateLimits, readRecentCodexRateLimits } from "./rate-limit-cache.js";
import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js";
import { readCodexAppServerBinding } from "./session-binding.js";
import { getSharedCodexAppServerClient } from "./shared-client.js";
import {
buildCodexRuntimeThreadConfig,
resolveCodexAppServerModelProvider,
resolveReasoningEffort,
} from "./thread-lifecycle.js";
const SIDE_QUESTION_COMPLETION_TIMEOUT_MS = 600_000;
const SIDE_BOUNDARY_PROMPT = [
"Side conversation starts here.",
"The inherited transcript above is reference material only.",
"Answer only the new side question after this boundary.",
"Do not continue, mutate, or complete the parent task unless the side question explicitly asks for that.",
].join("\n");
const SIDE_DEVELOPER_INSTRUCTIONS = [
"You are answering an OpenClaw /btw side question in an ephemeral Codex thread.",
"Use inherited conversation history only as context.",
"Keep the answer scoped to the side question and do not steer the parent conversation.",
].join("\n");
export async function runCodexAppServerSideQuestion(
params: AgentHarnessSideQuestionParams,
options: { pluginConfig?: unknown } = {},
): Promise<AgentHarnessSideQuestionResult> {
const binding = await readCodexAppServerBinding(params.sessionFile, {
agentDir: params.agentDir,
config: params.cfg,
});
if (!binding?.threadId) {
throw new Error(
"Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.",
);
}
const pluginConfig = readCodexPluginConfig(options.pluginConfig);
const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig });
const authProfileId = params.authProfileId ?? binding.authProfileId;
const client = await getSharedCodexAppServerClient({
startOptions: appServer.start,
timeoutMs: appServer.requestTimeoutMs,
authProfileId,
agentDir: params.agentDir,
config: params.cfg,
});
const collector = new CodexSideQuestionCollector(params);
const removeNotificationHandler = client.addNotificationHandler((notification) =>
collector.handleNotification(notification),
);
const removeRequestHandler = client.addRequestHandler(async (request) => {
if (request.method !== "account/chatgptAuthTokens/refresh") {
return undefined;
}
return (await refreshCodexAppServerAuthTokens({
agentDir: params.agentDir,
authProfileId,
config: params.cfg,
})) as unknown as JsonValue;
});
let childThreadId: string | undefined;
let turnId: string | undefined;
try {
const cwd = binding.cwd || params.workspaceDir || process.cwd();
const sandbox = binding.sandbox ?? appServer.sandbox;
const serviceTier = binding.serviceTier ?? appServer.serviceTier;
const modelProvider = resolveCodexAppServerModelProvider({
provider: params.provider,
authProfileId,
agentDir: params.agentDir,
config: params.cfg,
});
const forkResponse = assertCodexThreadForkResponse(
await forkCodexSideThread(
client,
{
threadId: binding.threadId,
model: params.model,
...(modelProvider ? { modelProvider } : {}),
cwd,
approvalPolicy: binding.approvalPolicy ?? appServer.approvalPolicy,
approvalsReviewer: appServer.approvalsReviewer,
sandbox,
...(serviceTier ? { serviceTier } : {}),
config: buildCodexRuntimeThreadConfig(undefined),
developerInstructions: SIDE_DEVELOPER_INSTRUCTIONS,
ephemeral: true,
threadSource: "user",
persistExtendedHistory: false,
},
{ timeoutMs: appServer.requestTimeoutMs, signal: params.opts?.abortSignal },
),
);
childThreadId = forkResponse.thread.id;
await client.request(
"thread/inject_items",
{
threadId: childThreadId,
items: [sideBoundaryPromptItem()],
},
{ timeoutMs: appServer.requestTimeoutMs, signal: params.opts?.abortSignal },
);
const effort = resolveReasoningEffort(params.resolvedThinkLevel ?? "off", params.model);
const turnResponse = assertCodexTurnStartResponse(
await client.request(
"turn/start",
{
threadId: childThreadId,
input: [{ type: "text", text: params.question.trim(), text_elements: [] }],
cwd,
approvalPolicy: appServer.approvalPolicy,
approvalsReviewer: appServer.approvalsReviewer,
sandboxPolicy: codexSandboxPolicyForTurn(sandbox, cwd),
model: params.model,
...(serviceTier ? { serviceTier } : {}),
effort,
collaborationMode: {
mode: "default",
settings: {
model: params.model,
reasoning_effort: effort,
developer_instructions: null,
},
},
},
{ timeoutMs: appServer.requestTimeoutMs, signal: params.opts?.abortSignal },
),
);
turnId = turnResponse.turn.id;
collector.setTurn(childThreadId, turnId);
const text = await collector.wait({
signal: params.opts?.abortSignal,
timeoutMs: Math.max(
appServer.turnCompletionIdleTimeoutMs,
SIDE_QUESTION_COMPLETION_TIMEOUT_MS,
),
});
const trimmed = text.trim();
if (!trimmed) {
throw new Error("Codex /btw completed without an answer.");
}
return { text: trimmed };
} finally {
removeNotificationHandler();
removeRequestHandler();
await cleanupCodexSideThread(client, {
threadId: childThreadId,
turnId,
interrupt: !collector.completed,
timeoutMs: appServer.requestTimeoutMs,
});
}
}
async function forkCodexSideThread(
client: CodexAppServerClient,
params: CodexThreadForkParams,
options: { timeoutMs: number; signal?: AbortSignal },
): Promise<unknown> {
try {
return await client.request("thread/fork", params, options);
} catch (error) {
if (isMissingCodexParentThreadError(error)) {
throw new Error(
"Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.",
{ cause: error },
);
}
throw error;
}
}
function isMissingCodexParentThreadError(error: unknown): boolean {
const message = formatErrorMessage(error);
return (
message.includes("no rollout found for thread id") ||
message.includes("includeTurns is unavailable before first user message")
);
}
function sideBoundaryPromptItem(): JsonObject {
return {
type: "message",
role: "user",
content: [
{
type: "input_text",
text: SIDE_BOUNDARY_PROMPT,
},
],
};
}
async function cleanupCodexSideThread(
client: CodexAppServerClient,
params: {
threadId?: string;
turnId?: string;
interrupt: boolean;
timeoutMs: number;
},
): Promise<void> {
if (!params.threadId) {
return;
}
if (params.interrupt && params.turnId) {
try {
await client.request(
"turn/interrupt",
{ threadId: params.threadId, turnId: params.turnId },
{ timeoutMs: params.timeoutMs },
);
} catch (error) {
embeddedAgentLog.debug("codex /btw side thread interrupt cleanup failed", { error });
}
}
try {
await client.request(
"thread/unsubscribe",
{ threadId: params.threadId },
{ timeoutMs: params.timeoutMs },
);
} catch (error) {
embeddedAgentLog.debug("codex /btw side thread unsubscribe cleanup failed", { error });
}
}
class CodexSideQuestionCollector {
private threadId: string | undefined;
private turnId: string | undefined;
private pendingNotifications: CodexServerNotification[] = [];
private assistantStarted = false;
private assistantText = "";
private finalText: string | undefined;
private terminalError: Error | undefined;
private latestRateLimits: JsonValue | undefined;
private settle:
| {
resolve: (text: string) => void;
reject: (error: Error) => void;
}
| undefined;
completed = false;
constructor(private readonly params: AgentHarnessSideQuestionParams) {}
setTurn(threadId: string, turnId: string): void {
this.threadId = threadId;
this.turnId = turnId;
const pending = this.pendingNotifications;
this.pendingNotifications = [];
for (const notification of pending) {
this.handleNotification(notification);
}
}
handleNotification(notification: CodexServerNotification): void {
const params = isJsonObject(notification.params) ? notification.params : undefined;
if (!params) {
return;
}
if (notification.method === "account/rateLimits/updated") {
this.latestRateLimits = params;
rememberCodexRateLimits(params);
return;
}
if (!this.threadId || !this.turnId) {
this.pendingNotifications.push(notification);
return;
}
if (!isNotificationForTurn(params, this.threadId, this.turnId)) {
return;
}
if (notification.method === "item/agentMessage/delta") {
void this.appendAssistantDelta(params);
return;
}
if (notification.method === "turn/completed") {
this.completeFromTurn(params);
return;
}
if (
notification.method === "error" &&
readBooleanAlias(params, ["willRetry", "will_retry"]) !== true
) {
this.reject(formatCodexErrorMessage(params, this.latestRateLimits));
}
}
wait(options: { signal?: AbortSignal; timeoutMs: number }): Promise<string> {
if (this.terminalError) {
return Promise.reject(this.terminalError);
}
if (this.completed) {
return Promise.resolve(this.finalText ?? this.assistantText);
}
if (options.signal?.aborted) {
return Promise.reject(new Error("Codex /btw was aborted."));
}
return new Promise((resolve, reject) => {
let timeout: ReturnType<typeof setTimeout> | undefined;
const cleanup = () => {
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}
options.signal?.removeEventListener("abort", abort);
};
const abort = () => {
cleanup();
this.settle = undefined;
reject(new Error("Codex /btw was aborted."));
};
timeout = setTimeout(
() => {
cleanup();
this.settle = undefined;
reject(new Error("Codex /btw timed out waiting for the side thread to finish."));
},
Math.max(100, options.timeoutMs),
);
timeout.unref?.();
options.signal?.addEventListener("abort", abort, { once: true });
this.settle = {
resolve: (text) => {
cleanup();
resolve(text);
},
reject: (error) => {
cleanup();
reject(error);
},
};
});
}
private async appendAssistantDelta(params: JsonObject): Promise<void> {
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
if (!this.assistantStarted) {
this.assistantStarted = true;
await this.params.opts?.onAssistantMessageStart?.();
}
this.assistantText += delta;
}
private completeFromTurn(params: JsonObject): void {
const notification = readCodexTurnCompletedNotification(params);
const turn = notification?.turn;
if (!turn || turn.id !== this.turnId) {
return;
}
this.completed = true;
if (turn.status === "failed") {
this.reject(
formatCodexUsageLimitErrorMessage({
message: turn.error?.message,
codexErrorInfo: turn.error?.codexErrorInfo as JsonValue | null | undefined,
rateLimits: this.latestRateLimits ?? readRecentCodexRateLimits(),
}) ??
turn.error?.message ??
"Codex /btw side thread failed.",
);
return;
}
if (turn.status === "interrupted") {
this.reject("Codex /btw side thread was interrupted.");
return;
}
const finalText = collectAssistantText(turn) || this.assistantText;
this.resolve(finalText);
}
private resolve(text: string): void {
this.finalText = text;
const settle = this.settle;
this.settle = undefined;
settle?.resolve(text);
}
private reject(error: string | Error): void {
this.terminalError = error instanceof Error ? error : new Error(error);
const settle = this.settle;
this.settle = undefined;
settle?.reject(this.terminalError);
}
}
function collectAssistantText(turn: CodexTurn): string {
const messages = (turn.items ?? [])
.filter((item) => item.type === "agentMessage" && typeof item.text === "string")
.map((item) => item.text.trim())
.filter(Boolean);
return messages.at(-1) ?? "";
}
function isNotificationForTurn(params: JsonObject, threadId: string, turnId: string): boolean {
return readString(params, "threadId") === threadId && readNotificationTurnId(params) === turnId;
}
function readNotificationTurnId(record: JsonObject): string | undefined {
return readString(record, "turnId") ?? readNestedTurnId(record);
}
function readNestedTurnId(record: JsonObject): string | undefined {
const turn = record.turn;
return isJsonObject(turn) ? readString(turn, "id") : undefined;
}
function readBooleanAlias(record: JsonObject, keys: readonly string[]): boolean | undefined {
for (const key of keys) {
const value = record[key];
if (typeof value === "boolean") {
return value;
}
}
return undefined;
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}
function formatCodexErrorMessage(
params: JsonObject,
latestRateLimits: JsonValue | undefined,
): Error {
const error = isJsonObject(params.error) ? params.error : undefined;
const message =
formatCodexUsageLimitErrorMessage({
message: error ? readString(error, "message") : undefined,
codexErrorInfo: error?.codexErrorInfo,
rateLimits: latestRateLimits ?? readRecentCodexRateLimits(),
}) ??
(error ? (readString(error, "message") ?? readString(error, "error")) : undefined) ??
readString(params, "message") ??
"Codex /btw side thread failed.";
return new Error(formatErrorMessage(message));
}

View File

@@ -44,7 +44,7 @@ export type CodexPluginThreadConfigProvider = {
build: () => Promise<CodexPluginThreadConfig>;
};
const CODEX_CODE_MODE_THREAD_CONFIG: JsonObject = {
export const CODEX_CODE_MODE_THREAD_CONFIG: JsonObject = {
"features.code_mode": true,
"features.code_mode_only": true,
};
@@ -348,7 +348,7 @@ export function buildThreadResumeParams(
};
}
function buildCodexRuntimeThreadConfig(config: JsonObject | undefined): JsonObject {
export function buildCodexRuntimeThreadConfig(config: JsonObject | undefined): JsonObject {
return (
mergeCodexThreadConfigs(config, CODEX_CODE_MODE_THREAD_CONFIG) ?? {
...CODEX_CODE_MODE_THREAD_CONFIG,
@@ -528,7 +528,7 @@ function buildUserInput(
];
}
function resolveCodexAppServerModelProvider(params: {
export function resolveCodexAppServerModelProvider(params: {
provider: string;
authProfileId?: string;
authProfileStore?: CodexAppServerAuthProfileLookup["authProfileStore"];

View File

@@ -95,6 +95,7 @@ vi.mock("../logging/diagnostic.js", () => ({
}));
const { runBtwSideQuestion } = await import("./btw.js");
const { clearAgentHarnesses, registerAgentHarness } = await import("./harness/registry.js");
type RunBtwSideQuestionParams = Parameters<typeof runBtwSideQuestion>[0];
const DEFAULT_AGENT_DIR = "/tmp/agent";
@@ -345,6 +346,7 @@ describe("runBtwSideQuestion", () => {
prepareProviderRuntimeAuthMock.mockReset();
registerProviderStreamForModelMock.mockReset();
diagDebugMock.mockReset();
clearAgentHarnesses();
readFileMock.mockResolvedValue("mock transcript");
parseSessionEntriesMock.mockReturnValue([
@@ -471,6 +473,63 @@ describe("runBtwSideQuestion", () => {
expect(ensureArgs?.[2]).toEqual({ workspaceDir: "/tmp/workspace" });
});
it("routes Codex-selected BTW questions through the harness side-question hook", async () => {
const codexSideQuestionMock = vi.fn().mockResolvedValue({ text: "Codex side answer." });
registerAgentHarness({
id: "codex",
label: "Codex test harness",
supports: () => ({ supported: true, priority: 100 }),
runAttempt: vi.fn(),
runSideQuestion: codexSideQuestionMock,
});
resolveModelWithRegistryMock.mockReturnValue({
provider: "openai",
id: "gpt-5.5",
api: "openai-responses",
});
resolveSessionAuthProfileOverrideMock.mockResolvedValue("openai-codex:work");
const result = await runSideQuestion({
provider: "openai",
model: "gpt-5.5",
sessionKey: DEFAULT_SESSION_KEY,
});
expect(result).toEqual({ text: "Codex side answer." });
expect(codexSideQuestionMock).toHaveBeenCalledTimes(1);
expect(codexSideQuestionMock.mock.calls[0]?.[0]).toMatchObject({
provider: "openai",
model: "gpt-5.5",
question: DEFAULT_QUESTION,
sessionId: "session-1",
agentId: "main",
workspaceDir: "/tmp/workspace",
authProfileId: "openai-codex:work",
});
expect(codexSideQuestionMock.mock.calls[0]?.[0].sessionFile).toContain("session-1.jsonl");
expect(streamSimpleMock).not.toHaveBeenCalled();
expect(registerProviderStreamForModelMock).not.toHaveBeenCalled();
});
it("does not fall back to the direct provider call when Codex lacks BTW support", async () => {
registerAgentHarness({
id: "codex",
label: "Codex test harness",
supports: () => ({ supported: true, priority: 100 }),
runAttempt: vi.fn(),
});
await expect(
runSideQuestion({
provider: "openai",
model: "gpt-5.5",
sessionKey: DEFAULT_SESSION_KEY,
}),
).rejects.toThrow('Selected agent harness "codex" does not support /btw side questions.');
expect(streamSimpleMock).not.toHaveBeenCalled();
expect(registerProviderStreamForModelMock).not.toHaveBeenCalled();
});
it("applies provider runtime auth before streaming github-copilot BTW questions", async () => {
resolveModelWithRegistryMock.mockReturnValue({
provider: "github-copilot",

View File

@@ -17,7 +17,7 @@ import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "./agent-scope.js";
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { readBtwTranscriptMessages, resolveBtwSessionTranscriptPath } from "./btw-transcript.js";
import { resolveAgentHarnessPolicy } from "./harness/selection.js";
import { resolveAgentHarnessPolicy, selectAgentHarness } from "./harness/selection.js";
import {
resolveImageSanitizationLimits,
type ImageSanitizationLimits,
@@ -307,6 +307,47 @@ export async function runBtwSideQuestion(
throw new Error("No active session transcript.");
}
const sessionAgentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.cfg,
});
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, sessionAgentId);
const harness = selectAgentHarness({
provider: params.provider,
modelId: params.model,
config: params.cfg,
agentId: sessionAgentId,
sessionKey: params.sessionKey,
});
if (harness.runSideQuestion) {
const { authProfileId, authProfileIdSource } = await resolveRuntimeModel({
cfg: params.cfg,
provider: params.provider,
model: params.model,
agentId: sessionAgentId,
agentDir: params.agentDir,
workspaceDir,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
isNewSession: params.isNewSession,
});
const result = await harness.runSideQuestion({
...params,
sessionId,
sessionFile,
agentId: sessionAgentId,
workspaceDir,
authProfileId,
authProfileIdSource,
});
return { text: result.text };
}
if (harness.id !== "pi") {
throw new Error(`Selected agent harness "${harness.id}" does not support /btw side questions.`);
}
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
const imageLimits = resolveImageSanitizationLimits(params.cfg);
let messages: Message[] = [];
@@ -334,11 +375,6 @@ export async function runBtwSideQuestion(
throw new Error("No active session context.");
}
const sessionAgentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.cfg,
});
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, sessionAgentId);
const { model, authProfileId } = await resolveRuntimeModel({
cfg: params.cfg,
provider: params.provider,

View File

@@ -12,6 +12,32 @@ export type AgentHarnessAttemptParams =
import("../pi-embedded-runner/run/types.js").EmbeddedRunAttemptParams;
export type AgentHarnessAttemptResult =
import("../pi-embedded-runner/run/types.js").EmbeddedRunAttemptResult;
export type AgentHarnessSideQuestionParams = {
cfg: import("../../config/types.openclaw.js").OpenClawConfig;
agentDir: string;
provider: string;
model: string;
question: string;
sessionEntry: import("../../config/sessions.js").SessionEntry;
sessionStore?: Record<string, import("../../config/sessions.js").SessionEntry>;
sessionKey?: string;
storePath?: string;
resolvedThinkLevel?: import("../../auto-reply/thinking.js").ThinkLevel;
resolvedReasoningLevel: import("../../auto-reply/thinking.js").ReasoningLevel;
blockReplyChunking?: import("../pi-embedded-block-chunker.js").BlockReplyChunking;
resolvedBlockStreamingBreak?: "text_end" | "message_end";
opts?: import("../../auto-reply/get-reply-options.types.js").GetReplyOptions;
isNewSession: boolean;
sessionId: string;
sessionFile: string;
agentId?: string;
workspaceDir?: string;
authProfileId?: string;
authProfileIdSource?: "auto" | "user";
};
export type AgentHarnessSideQuestionResult = {
text: string;
};
export type AgentHarnessCompactParams =
import("../pi-embedded-runner/compact.types.js").CompactEmbeddedPiSessionParams;
export type AgentHarnessCompactResult =
@@ -42,6 +68,7 @@ export type AgentHarness = {
deliveryDefaults?: AgentHarnessDeliveryDefaults;
supports(ctx: AgentHarnessSupportContext): AgentHarnessSupport;
runAttempt(params: AgentHarnessAttemptParams): Promise<AgentHarnessAttemptResult>;
runSideQuestion?(params: AgentHarnessSideQuestionParams): Promise<AgentHarnessSideQuestionResult>;
classify?(
result: AgentHarnessAttemptResult,
ctx: AgentHarnessAttemptParams,

View File

@@ -25,6 +25,8 @@ export type {
AgentHarnessCompactResult,
AgentHarnessDeliveryDefaults,
AgentHarnessResultClassification,
AgentHarnessSideQuestionParams,
AgentHarnessSideQuestionResult,
AgentHarnessResetParams,
AgentHarnessSupport,
AgentHarnessSupportContext,