diff --git a/docs/tools/btw.md b/docs/tools/btw.md index f7b07b4b0e1..4168a3c3638 100644 --- a/docs/tools/btw.md +++ b/docs/tools/btw.md @@ -23,7 +23,7 @@ When you send: OpenClaw: 1. snapshots the current session context, -2. runs a separate **tool-less** model call, +2. runs a separate ephemeral side query, 3. answers only the side question, 4. leaves the main run alone, 5. does **not** write the BTW question or answer to session history, @@ -33,17 +33,22 @@ The important mental model is: - same session context - separate one-shot side query -- no tool calls +- same native harness transport when the session uses a native harness - no future context pollution - no transcript persistence +For Codex harness sessions, BTW stays inside Codex by forking the active +app-server thread as an ephemeral side thread, matching Codex `/side` +semantics. That keeps Codex OAuth, native transport behavior, and Codex's +workspace/tool machinery intact while still isolating the side answer from the +parent transcript. Non-Codex runtimes keep the older direct one-shot path. + ## What it does not do `/btw` does **not**: - create a new durable session, - continue the unfinished main task, -- run tools or agent tool loops, - write BTW question/answer data to transcript history, - appear in `chat.history`, - survive a reload. @@ -60,7 +65,7 @@ explicitly telling the model: - answer only the side question, - do not resume or complete the unfinished main task, -- do not emit tool calls or pseudo-tool calls. +- do not steer the parent conversation. That keeps BTW isolated from the main run while still making it aware of what the session is about. diff --git a/extensions/codex/harness.ts b/extensions/codex/harness.ts index 008727618c4..2ec16316bc5 100644 --- a/extensions/codex/harness.ts +++ b/extensions/codex/harness.ts @@ -40,6 +40,10 @@ export function createCodexAppServerAgentHarness(options?: { const { runCodexAppServerAttempt } = await import("./src/app-server/run-attempt.js"); return runCodexAppServerAttempt(params, { pluginConfig: options?.pluginConfig }); }, + runSideQuestion: async (params) => { + const { runCodexAppServerSideQuestion } = await import("./src/app-server/side-question.js"); + return runCodexAppServerSideQuestion(params, { pluginConfig: options?.pluginConfig }); + }, compact: async (params) => { const { maybeCompactCodexAppServerSession } = await import("./src/app-server/compact.js"); return maybeCompactCodexAppServerSession(params, { pluginConfig: options?.pluginConfig }); diff --git a/extensions/codex/src/app-server/protocol-validators.ts b/extensions/codex/src/app-server/protocol-validators.ts index 114e6e04bec..b341a56250a 100644 --- a/extensions/codex/src/app-server/protocol-validators.ts +++ b/extensions/codex/src/app-server/protocol-validators.ts @@ -10,6 +10,7 @@ import type { CodexDynamicToolCallParams, CodexErrorNotification, CodexModelListResponse, + CodexThreadForkResponse, CodexThreadResumeResponse, CodexThreadStartResponse, CodexTurn, @@ -50,6 +51,14 @@ export function assertCodexThreadStartResponse(value: unknown): CodexThreadStart ); } +export function assertCodexThreadForkResponse(value: unknown): CodexThreadForkResponse { + return assertCodexShape( + validateThreadStartResponse, + normalizeThreadResponse(value), + "thread/fork response", + ); +} + export function assertCodexThreadResumeResponse(value: unknown): CodexThreadResumeResponse { return assertCodexShape( validateThreadResumeResponse, diff --git a/extensions/codex/src/app-server/protocol.ts b/extensions/codex/src/app-server/protocol.ts index 6142db6628e..e9a476973f8 100644 --- a/extensions/codex/src/app-server/protocol.ts +++ b/extensions/codex/src/app-server/protocol.ts @@ -97,12 +97,36 @@ export type CodexThreadStartResponse = { modelProvider?: string | null; }; +export type CodexThreadForkParams = CodexThreadStartParams & { + threadId: string; + baseInstructions?: string; + ephemeral?: boolean; + threadSource?: string | JsonObject; + excludeTurns?: boolean; +}; + +export type CodexThreadForkResponse = CodexThreadStartResponse; + export type CodexThreadResumeResponse = { thread: CodexThread; model: string; modelProvider?: string | null; }; +export type CodexThreadInjectItemsParams = JsonObject & { + threadId: string; + items: JsonValue[]; +}; + +export type CodexThreadUnsubscribeParams = JsonObject & { + threadId: string; +}; + +export type CodexTurnInterruptParams = JsonObject & { + threadId: string; + turnId: string; +}; + export type CodexTurnStartParams = JsonObject & { threadId: string; input?: CodexUserInput[]; @@ -401,7 +425,11 @@ export declare namespace v2 { } type CodexAppServerRequestParamsOverride = { + "thread/fork": CodexThreadForkParams; + "thread/inject_items": CodexThreadInjectItemsParams; "thread/start": CodexThreadStartParams; + "thread/unsubscribe": CodexThreadUnsubscribeParams; + "turn/interrupt": CodexTurnInterruptParams; }; type CodexAppServerRequestResultMap = { @@ -422,9 +450,12 @@ type CodexAppServerRequestResultMap = { "review/start": JsonValue; "skills/list": CodexSkillsListResponse; "thread/compact/start": JsonValue; + "thread/fork": CodexThreadForkResponse; + "thread/inject_items": JsonValue; "thread/list": JsonValue; "thread/resume": CodexThreadResumeResponse; "thread/start": CodexThreadStartResponse; + "thread/unsubscribe": JsonValue; "turn/interrupt": JsonValue; "turn/start": CodexTurnStartResponse; "turn/steer": JsonValue; diff --git a/extensions/codex/src/app-server/side-question.test.ts b/extensions/codex/src/app-server/side-question.test.ts new file mode 100644 index 00000000000..8a6a383d5e5 --- /dev/null +++ b/extensions/codex/src/app-server/side-question.test.ts @@ -0,0 +1,359 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { CodexServerNotification, RpcRequest } from "./protocol.js"; + +const readCodexAppServerBindingMock = vi.fn(); +const isCodexAppServerNativeAuthProfileMock = vi.fn(); +const getSharedCodexAppServerClientMock = vi.fn(); +const refreshCodexAppServerAuthTokensMock = vi.fn(); + +vi.mock("./session-binding.js", () => ({ + clearCodexAppServerBinding: vi.fn(), + isCodexAppServerNativeAuthProfile: (...args: unknown[]) => + isCodexAppServerNativeAuthProfileMock(...args), + readCodexAppServerBinding: (...args: unknown[]) => readCodexAppServerBindingMock(...args), + writeCodexAppServerBinding: vi.fn(), +})); + +vi.mock("./shared-client.js", () => ({ + getSharedCodexAppServerClient: (...args: unknown[]) => getSharedCodexAppServerClientMock(...args), +})); + +vi.mock("./auth-bridge.js", () => ({ + refreshCodexAppServerAuthTokens: (...args: unknown[]) => + refreshCodexAppServerAuthTokensMock(...args), +})); + +const { runCodexAppServerSideQuestion } = await import("./side-question.js"); + +type ServerRequest = Required> & { + params?: RpcRequest["params"]; +}; + +type FakeClient = { + request: ReturnType; + addNotificationHandler: ReturnType; + addRequestHandler: ReturnType; + notifications: Array<(notification: CodexServerNotification) => void>; + requests: Array<(request: ServerRequest) => unknown>; + emit: (notification: CodexServerNotification) => void; +}; + +function createFakeClient(): FakeClient { + const notifications: FakeClient["notifications"] = []; + const requests: FakeClient["requests"] = []; + const client: FakeClient = { + notifications, + requests, + request: vi.fn(), + addNotificationHandler: vi.fn((handler: (notification: CodexServerNotification) => void) => { + notifications.push(handler); + return () => { + const index = notifications.indexOf(handler); + if (index >= 0) { + notifications.splice(index, 1); + } + }; + }), + addRequestHandler: vi.fn((handler: FakeClient["requests"][number]) => { + requests.push(handler); + return () => { + const index = requests.indexOf(handler); + if (index >= 0) { + requests.splice(index, 1); + } + }; + }), + emit: (notification) => { + for (const handler of notifications) { + handler(notification); + } + }, + }; + client.request.mockImplementation(async (method: string) => { + if (method === "thread/fork") { + return threadResult("side-thread"); + } + if (method === "thread/inject_items") { + return {}; + } + if (method === "turn/start") { + queueMicrotask(() => { + client.emit(agentDelta("side-thread", "turn-1", "Side answer.")); + client.emit(turnCompleted("side-thread", "turn-1", "Side answer.")); + }); + return turnStartResult("turn-1"); + } + if (method === "thread/unsubscribe" || method === "turn/interrupt") { + return {}; + } + throw new Error(`unexpected request: ${method}`); + }); + return client; +} + +function threadResult(threadId: string) { + return { + thread: { + id: threadId, + sessionId: threadId, + forkedFromId: null, + preview: "", + ephemeral: true, + modelProvider: "openai", + createdAt: 1, + updatedAt: 1, + status: { type: "idle" }, + path: null, + cwd: "/tmp/workspace", + cliVersion: "0.125.0", + source: "unknown", + agentNickname: null, + agentRole: null, + gitInfo: null, + name: null, + turns: [], + }, + model: "gpt-5.5", + modelProvider: "openai", + cwd: "/tmp/workspace", + approvalPolicy: "never", + approvalsReviewer: "user", + sandbox: { type: "dangerFullAccess" }, + }; +} + +function turnStartResult(turnId: string) { + return { + turn: { + id: turnId, + threadId: "side-thread", + status: "inProgress", + items: [], + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }; +} + +function agentDelta(threadId: string, turnId: string, delta: string): CodexServerNotification { + return { + method: "item/agentMessage/delta", + params: { threadId, turnId, itemId: "agent-1", delta }, + }; +} + +function turnCompleted(threadId: string, turnId: string, text: string): CodexServerNotification { + return { + method: "turn/completed", + params: { + threadId, + turn: { + id: turnId, + threadId, + status: "completed", + items: [{ id: "agent-1", type: "agentMessage", text }], + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + }; +} + +function sideParams(overrides: Partial[0]> = {}) { + return { + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "openai", + model: "gpt-5.5", + question: "What changed?", + sessionEntry: { + sessionId: "session-1", + sessionFile: "/tmp/session-1.jsonl", + updatedAt: 1, + }, + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + sessionId: "session-1", + sessionFile: "/tmp/session-1.jsonl", + workspaceDir: "/tmp/workspace", + authProfileId: "openai-codex:work", + authProfileIdSource: "user", + ...overrides, + } satisfies Parameters[0]; +} + +describe("runCodexAppServerSideQuestion", () => { + beforeEach(() => { + readCodexAppServerBindingMock.mockReset(); + isCodexAppServerNativeAuthProfileMock.mockReset(); + getSharedCodexAppServerClientMock.mockReset(); + refreshCodexAppServerAuthTokensMock.mockReset(); + + readCodexAppServerBindingMock.mockResolvedValue({ + schemaVersion: 1, + threadId: "parent-thread", + sessionFile: "/tmp/session-1.jsonl", + cwd: "/tmp/workspace", + authProfileId: "openai-codex:work", + model: "gpt-5.5", + createdAt: new Date(0).toISOString(), + updatedAt: new Date(0).toISOString(), + }); + isCodexAppServerNativeAuthProfileMock.mockReturnValue(true); + getSharedCodexAppServerClientMock.mockResolvedValue(createFakeClient()); + refreshCodexAppServerAuthTokensMock.mockResolvedValue({ + accessToken: "access-token", + chatgptAccountId: "account-1", + chatgptPlanType: "plus", + }); + }); + + it("forks an ephemeral side thread and returns the completed assistant text", async () => { + const client = createFakeClient(); + getSharedCodexAppServerClientMock.mockResolvedValue(client); + + const result = await runCodexAppServerSideQuestion(sideParams()); + + expect(result).toEqual({ text: "Side answer." }); + expect(client.request).toHaveBeenNthCalledWith( + 1, + "thread/fork", + expect.objectContaining({ + threadId: "parent-thread", + model: "gpt-5.5", + ephemeral: true, + threadSource: "user", + persistExtendedHistory: false, + }), + expect.any(Object), + ); + expect(client.request.mock.calls[0]?.[1]).not.toHaveProperty("modelProvider"); + expect(client.request).toHaveBeenNthCalledWith( + 2, + "thread/inject_items", + expect.objectContaining({ + threadId: "side-thread", + items: [expect.objectContaining({ type: "message", role: "user" })], + }), + expect.any(Object), + ); + expect(client.request).toHaveBeenCalledWith( + "turn/start", + expect.objectContaining({ + threadId: "side-thread", + input: [{ type: "text", text: "What changed?", text_elements: [] }], + model: "gpt-5.5", + }), + expect.any(Object), + ); + expect(client.request).toHaveBeenLastCalledWith( + "thread/unsubscribe", + { threadId: "side-thread" }, + expect.any(Object), + ); + expect(client.request).not.toHaveBeenCalledWith( + "turn/interrupt", + expect.anything(), + expect.anything(), + ); + }); + + it("uses the app-server auth refresh request handler while the side thread is active", async () => { + const client = createFakeClient(); + client.request.mockImplementation(async (method: string) => { + if (method === "thread/fork") { + await client.requests[0]?.({ + id: 1, + method: "account/chatgptAuthTokens/refresh", + }); + return threadResult("side-thread"); + } + if (method === "thread/inject_items") { + return {}; + } + if (method === "turn/start") { + queueMicrotask(() => client.emit(turnCompleted("side-thread", "turn-1", "Done."))); + return turnStartResult("turn-1"); + } + return {}; + }); + getSharedCodexAppServerClientMock.mockResolvedValue(client); + + await runCodexAppServerSideQuestion(sideParams()); + + expect(refreshCodexAppServerAuthTokensMock).toHaveBeenCalledWith({ + agentDir: "/tmp/agent", + authProfileId: "openai-codex:work", + config: {}, + }); + }); + + it("returns a clear setup error when there is no Codex parent thread", async () => { + readCodexAppServerBindingMock.mockResolvedValue(undefined); + + await expect(runCodexAppServerSideQuestion(sideParams())).rejects.toThrow( + "Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.", + ); + expect(getSharedCodexAppServerClientMock).not.toHaveBeenCalled(); + }); + + it("returns the same setup error when the persisted parent binding is stale", async () => { + const client = createFakeClient(); + client.request.mockImplementation(async (method: string) => { + if (method === "thread/fork") { + throw new Error("thread/fork failed: no rollout found for thread id parent-thread"); + } + return {}; + }); + getSharedCodexAppServerClientMock.mockResolvedValue(client); + + await expect(runCodexAppServerSideQuestion(sideParams())).rejects.toThrow( + "Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.", + ); + }); + + it("interrupts and unsubscribes the ephemeral thread on abort", async () => { + const controller = new AbortController(); + const client = createFakeClient(); + client.request.mockImplementation(async (method: string) => { + if (method === "thread/fork") { + return threadResult("side-thread"); + } + if (method === "thread/inject_items") { + return {}; + } + if (method === "turn/start") { + queueMicrotask(() => controller.abort()); + return turnStartResult("turn-1"); + } + if (method === "turn/interrupt" || method === "thread/unsubscribe") { + return {}; + } + throw new Error(`unexpected request: ${method}`); + }); + getSharedCodexAppServerClientMock.mockResolvedValue(client); + + await expect( + runCodexAppServerSideQuestion( + sideParams({ + opts: { abortSignal: controller.signal }, + }), + ), + ).rejects.toThrow("Codex /btw was aborted."); + expect(client.request).toHaveBeenCalledWith( + "turn/interrupt", + { threadId: "side-thread", turnId: "turn-1" }, + expect.any(Object), + ); + expect(client.request).toHaveBeenCalledWith( + "thread/unsubscribe", + { threadId: "side-thread" }, + expect.any(Object), + ); + }); +}); diff --git a/extensions/codex/src/app-server/side-question.ts b/extensions/codex/src/app-server/side-question.ts new file mode 100644 index 00000000000..6d7e26849d7 --- /dev/null +++ b/extensions/codex/src/app-server/side-question.ts @@ -0,0 +1,473 @@ +import { + embeddedAgentLog, + formatErrorMessage, + type AgentHarnessSideQuestionParams, + type AgentHarnessSideQuestionResult, +} from "openclaw/plugin-sdk/agent-harness-runtime"; +import { refreshCodexAppServerAuthTokens } from "./auth-bridge.js"; +import { type CodexAppServerClient } from "./client.js"; +import { + codexSandboxPolicyForTurn, + readCodexPluginConfig, + resolveCodexAppServerRuntimeOptions, +} from "./config.js"; +import { + assertCodexThreadForkResponse, + assertCodexTurnStartResponse, + readCodexTurnCompletedNotification, +} from "./protocol-validators.js"; +import { + isJsonObject, + type CodexServerNotification, + type CodexThreadForkParams, + type CodexTurn, + type JsonObject, + type JsonValue, +} from "./protocol.js"; +import { rememberCodexRateLimits, readRecentCodexRateLimits } from "./rate-limit-cache.js"; +import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js"; +import { readCodexAppServerBinding } from "./session-binding.js"; +import { getSharedCodexAppServerClient } from "./shared-client.js"; +import { + buildCodexRuntimeThreadConfig, + resolveCodexAppServerModelProvider, + resolveReasoningEffort, +} from "./thread-lifecycle.js"; + +const SIDE_QUESTION_COMPLETION_TIMEOUT_MS = 600_000; +const SIDE_BOUNDARY_PROMPT = [ + "Side conversation starts here.", + "The inherited transcript above is reference material only.", + "Answer only the new side question after this boundary.", + "Do not continue, mutate, or complete the parent task unless the side question explicitly asks for that.", +].join("\n"); +const SIDE_DEVELOPER_INSTRUCTIONS = [ + "You are answering an OpenClaw /btw side question in an ephemeral Codex thread.", + "Use inherited conversation history only as context.", + "Keep the answer scoped to the side question and do not steer the parent conversation.", +].join("\n"); + +export async function runCodexAppServerSideQuestion( + params: AgentHarnessSideQuestionParams, + options: { pluginConfig?: unknown } = {}, +): Promise { + const binding = await readCodexAppServerBinding(params.sessionFile, { + agentDir: params.agentDir, + config: params.cfg, + }); + if (!binding?.threadId) { + throw new Error( + "Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.", + ); + } + + const pluginConfig = readCodexPluginConfig(options.pluginConfig); + const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig }); + const authProfileId = params.authProfileId ?? binding.authProfileId; + const client = await getSharedCodexAppServerClient({ + startOptions: appServer.start, + timeoutMs: appServer.requestTimeoutMs, + authProfileId, + agentDir: params.agentDir, + config: params.cfg, + }); + const collector = new CodexSideQuestionCollector(params); + const removeNotificationHandler = client.addNotificationHandler((notification) => + collector.handleNotification(notification), + ); + const removeRequestHandler = client.addRequestHandler(async (request) => { + if (request.method !== "account/chatgptAuthTokens/refresh") { + return undefined; + } + return (await refreshCodexAppServerAuthTokens({ + agentDir: params.agentDir, + authProfileId, + config: params.cfg, + })) as unknown as JsonValue; + }); + + let childThreadId: string | undefined; + let turnId: string | undefined; + try { + const cwd = binding.cwd || params.workspaceDir || process.cwd(); + const sandbox = binding.sandbox ?? appServer.sandbox; + const serviceTier = binding.serviceTier ?? appServer.serviceTier; + const modelProvider = resolveCodexAppServerModelProvider({ + provider: params.provider, + authProfileId, + agentDir: params.agentDir, + config: params.cfg, + }); + const forkResponse = assertCodexThreadForkResponse( + await forkCodexSideThread( + client, + { + threadId: binding.threadId, + model: params.model, + ...(modelProvider ? { modelProvider } : {}), + cwd, + approvalPolicy: binding.approvalPolicy ?? appServer.approvalPolicy, + approvalsReviewer: appServer.approvalsReviewer, + sandbox, + ...(serviceTier ? { serviceTier } : {}), + config: buildCodexRuntimeThreadConfig(undefined), + developerInstructions: SIDE_DEVELOPER_INSTRUCTIONS, + ephemeral: true, + threadSource: "user", + persistExtendedHistory: false, + }, + { timeoutMs: appServer.requestTimeoutMs, signal: params.opts?.abortSignal }, + ), + ); + childThreadId = forkResponse.thread.id; + + await client.request( + "thread/inject_items", + { + threadId: childThreadId, + items: [sideBoundaryPromptItem()], + }, + { timeoutMs: appServer.requestTimeoutMs, signal: params.opts?.abortSignal }, + ); + + const effort = resolveReasoningEffort(params.resolvedThinkLevel ?? "off", params.model); + const turnResponse = assertCodexTurnStartResponse( + await client.request( + "turn/start", + { + threadId: childThreadId, + input: [{ type: "text", text: params.question.trim(), text_elements: [] }], + cwd, + approvalPolicy: appServer.approvalPolicy, + approvalsReviewer: appServer.approvalsReviewer, + sandboxPolicy: codexSandboxPolicyForTurn(sandbox, cwd), + model: params.model, + ...(serviceTier ? { serviceTier } : {}), + effort, + collaborationMode: { + mode: "default", + settings: { + model: params.model, + reasoning_effort: effort, + developer_instructions: null, + }, + }, + }, + { timeoutMs: appServer.requestTimeoutMs, signal: params.opts?.abortSignal }, + ), + ); + turnId = turnResponse.turn.id; + collector.setTurn(childThreadId, turnId); + + const text = await collector.wait({ + signal: params.opts?.abortSignal, + timeoutMs: Math.max( + appServer.turnCompletionIdleTimeoutMs, + SIDE_QUESTION_COMPLETION_TIMEOUT_MS, + ), + }); + const trimmed = text.trim(); + if (!trimmed) { + throw new Error("Codex /btw completed without an answer."); + } + return { text: trimmed }; + } finally { + removeNotificationHandler(); + removeRequestHandler(); + await cleanupCodexSideThread(client, { + threadId: childThreadId, + turnId, + interrupt: !collector.completed, + timeoutMs: appServer.requestTimeoutMs, + }); + } +} + +async function forkCodexSideThread( + client: CodexAppServerClient, + params: CodexThreadForkParams, + options: { timeoutMs: number; signal?: AbortSignal }, +): Promise { + try { + return await client.request("thread/fork", params, options); + } catch (error) { + if (isMissingCodexParentThreadError(error)) { + throw new Error( + "Codex /btw needs an active Codex thread. Send a normal message first, then try /btw again.", + { cause: error }, + ); + } + throw error; + } +} + +function isMissingCodexParentThreadError(error: unknown): boolean { + const message = formatErrorMessage(error); + return ( + message.includes("no rollout found for thread id") || + message.includes("includeTurns is unavailable before first user message") + ); +} + +function sideBoundaryPromptItem(): JsonObject { + return { + type: "message", + role: "user", + content: [ + { + type: "input_text", + text: SIDE_BOUNDARY_PROMPT, + }, + ], + }; +} + +async function cleanupCodexSideThread( + client: CodexAppServerClient, + params: { + threadId?: string; + turnId?: string; + interrupt: boolean; + timeoutMs: number; + }, +): Promise { + if (!params.threadId) { + return; + } + if (params.interrupt && params.turnId) { + try { + await client.request( + "turn/interrupt", + { threadId: params.threadId, turnId: params.turnId }, + { timeoutMs: params.timeoutMs }, + ); + } catch (error) { + embeddedAgentLog.debug("codex /btw side thread interrupt cleanup failed", { error }); + } + } + try { + await client.request( + "thread/unsubscribe", + { threadId: params.threadId }, + { timeoutMs: params.timeoutMs }, + ); + } catch (error) { + embeddedAgentLog.debug("codex /btw side thread unsubscribe cleanup failed", { error }); + } +} + +class CodexSideQuestionCollector { + private threadId: string | undefined; + private turnId: string | undefined; + private pendingNotifications: CodexServerNotification[] = []; + private assistantStarted = false; + private assistantText = ""; + private finalText: string | undefined; + private terminalError: Error | undefined; + private latestRateLimits: JsonValue | undefined; + private settle: + | { + resolve: (text: string) => void; + reject: (error: Error) => void; + } + | undefined; + completed = false; + + constructor(private readonly params: AgentHarnessSideQuestionParams) {} + + setTurn(threadId: string, turnId: string): void { + this.threadId = threadId; + this.turnId = turnId; + const pending = this.pendingNotifications; + this.pendingNotifications = []; + for (const notification of pending) { + this.handleNotification(notification); + } + } + + handleNotification(notification: CodexServerNotification): void { + const params = isJsonObject(notification.params) ? notification.params : undefined; + if (!params) { + return; + } + if (notification.method === "account/rateLimits/updated") { + this.latestRateLimits = params; + rememberCodexRateLimits(params); + return; + } + if (!this.threadId || !this.turnId) { + this.pendingNotifications.push(notification); + return; + } + if (!isNotificationForTurn(params, this.threadId, this.turnId)) { + return; + } + if (notification.method === "item/agentMessage/delta") { + void this.appendAssistantDelta(params); + return; + } + if (notification.method === "turn/completed") { + this.completeFromTurn(params); + return; + } + if ( + notification.method === "error" && + readBooleanAlias(params, ["willRetry", "will_retry"]) !== true + ) { + this.reject(formatCodexErrorMessage(params, this.latestRateLimits)); + } + } + + wait(options: { signal?: AbortSignal; timeoutMs: number }): Promise { + if (this.terminalError) { + return Promise.reject(this.terminalError); + } + if (this.completed) { + return Promise.resolve(this.finalText ?? this.assistantText); + } + if (options.signal?.aborted) { + return Promise.reject(new Error("Codex /btw was aborted.")); + } + return new Promise((resolve, reject) => { + let timeout: ReturnType | undefined; + const cleanup = () => { + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + options.signal?.removeEventListener("abort", abort); + }; + const abort = () => { + cleanup(); + this.settle = undefined; + reject(new Error("Codex /btw was aborted.")); + }; + timeout = setTimeout( + () => { + cleanup(); + this.settle = undefined; + reject(new Error("Codex /btw timed out waiting for the side thread to finish.")); + }, + Math.max(100, options.timeoutMs), + ); + timeout.unref?.(); + options.signal?.addEventListener("abort", abort, { once: true }); + this.settle = { + resolve: (text) => { + cleanup(); + resolve(text); + }, + reject: (error) => { + cleanup(); + reject(error); + }, + }; + }); + } + + private async appendAssistantDelta(params: JsonObject): Promise { + const delta = readString(params, "delta") ?? ""; + if (!delta) { + return; + } + if (!this.assistantStarted) { + this.assistantStarted = true; + await this.params.opts?.onAssistantMessageStart?.(); + } + this.assistantText += delta; + } + + private completeFromTurn(params: JsonObject): void { + const notification = readCodexTurnCompletedNotification(params); + const turn = notification?.turn; + if (!turn || turn.id !== this.turnId) { + return; + } + this.completed = true; + if (turn.status === "failed") { + this.reject( + formatCodexUsageLimitErrorMessage({ + message: turn.error?.message, + codexErrorInfo: turn.error?.codexErrorInfo as JsonValue | null | undefined, + rateLimits: this.latestRateLimits ?? readRecentCodexRateLimits(), + }) ?? + turn.error?.message ?? + "Codex /btw side thread failed.", + ); + return; + } + if (turn.status === "interrupted") { + this.reject("Codex /btw side thread was interrupted."); + return; + } + const finalText = collectAssistantText(turn) || this.assistantText; + this.resolve(finalText); + } + + private resolve(text: string): void { + this.finalText = text; + const settle = this.settle; + this.settle = undefined; + settle?.resolve(text); + } + + private reject(error: string | Error): void { + this.terminalError = error instanceof Error ? error : new Error(error); + const settle = this.settle; + this.settle = undefined; + settle?.reject(this.terminalError); + } +} + +function collectAssistantText(turn: CodexTurn): string { + const messages = (turn.items ?? []) + .filter((item) => item.type === "agentMessage" && typeof item.text === "string") + .map((item) => item.text.trim()) + .filter(Boolean); + return messages.at(-1) ?? ""; +} + +function isNotificationForTurn(params: JsonObject, threadId: string, turnId: string): boolean { + return readString(params, "threadId") === threadId && readNotificationTurnId(params) === turnId; +} + +function readNotificationTurnId(record: JsonObject): string | undefined { + return readString(record, "turnId") ?? readNestedTurnId(record); +} + +function readNestedTurnId(record: JsonObject): string | undefined { + const turn = record.turn; + return isJsonObject(turn) ? readString(turn, "id") : undefined; +} + +function readBooleanAlias(record: JsonObject, keys: readonly string[]): boolean | undefined { + for (const key of keys) { + const value = record[key]; + if (typeof value === "boolean") { + return value; + } + } + return undefined; +} + +function readString(record: JsonObject, key: string): string | undefined { + const value = record[key]; + return typeof value === "string" ? value : undefined; +} + +function formatCodexErrorMessage( + params: JsonObject, + latestRateLimits: JsonValue | undefined, +): Error { + const error = isJsonObject(params.error) ? params.error : undefined; + const message = + formatCodexUsageLimitErrorMessage({ + message: error ? readString(error, "message") : undefined, + codexErrorInfo: error?.codexErrorInfo, + rateLimits: latestRateLimits ?? readRecentCodexRateLimits(), + }) ?? + (error ? (readString(error, "message") ?? readString(error, "error")) : undefined) ?? + readString(params, "message") ?? + "Codex /btw side thread failed."; + return new Error(formatErrorMessage(message)); +} diff --git a/extensions/codex/src/app-server/thread-lifecycle.ts b/extensions/codex/src/app-server/thread-lifecycle.ts index 663d6d8b7c1..fc18109f36f 100644 --- a/extensions/codex/src/app-server/thread-lifecycle.ts +++ b/extensions/codex/src/app-server/thread-lifecycle.ts @@ -44,7 +44,7 @@ export type CodexPluginThreadConfigProvider = { build: () => Promise; }; -const CODEX_CODE_MODE_THREAD_CONFIG: JsonObject = { +export const CODEX_CODE_MODE_THREAD_CONFIG: JsonObject = { "features.code_mode": true, "features.code_mode_only": true, }; @@ -348,7 +348,7 @@ export function buildThreadResumeParams( }; } -function buildCodexRuntimeThreadConfig(config: JsonObject | undefined): JsonObject { +export function buildCodexRuntimeThreadConfig(config: JsonObject | undefined): JsonObject { return ( mergeCodexThreadConfigs(config, CODEX_CODE_MODE_THREAD_CONFIG) ?? { ...CODEX_CODE_MODE_THREAD_CONFIG, @@ -528,7 +528,7 @@ function buildUserInput( ]; } -function resolveCodexAppServerModelProvider(params: { +export function resolveCodexAppServerModelProvider(params: { provider: string; authProfileId?: string; authProfileStore?: CodexAppServerAuthProfileLookup["authProfileStore"]; diff --git a/src/agents/btw.test.ts b/src/agents/btw.test.ts index cabe5aabe88..d2e4359b93a 100644 --- a/src/agents/btw.test.ts +++ b/src/agents/btw.test.ts @@ -95,6 +95,7 @@ vi.mock("../logging/diagnostic.js", () => ({ })); const { runBtwSideQuestion } = await import("./btw.js"); +const { clearAgentHarnesses, registerAgentHarness } = await import("./harness/registry.js"); type RunBtwSideQuestionParams = Parameters[0]; const DEFAULT_AGENT_DIR = "/tmp/agent"; @@ -345,6 +346,7 @@ describe("runBtwSideQuestion", () => { prepareProviderRuntimeAuthMock.mockReset(); registerProviderStreamForModelMock.mockReset(); diagDebugMock.mockReset(); + clearAgentHarnesses(); readFileMock.mockResolvedValue("mock transcript"); parseSessionEntriesMock.mockReturnValue([ @@ -471,6 +473,63 @@ describe("runBtwSideQuestion", () => { expect(ensureArgs?.[2]).toEqual({ workspaceDir: "/tmp/workspace" }); }); + it("routes Codex-selected BTW questions through the harness side-question hook", async () => { + const codexSideQuestionMock = vi.fn().mockResolvedValue({ text: "Codex side answer." }); + registerAgentHarness({ + id: "codex", + label: "Codex test harness", + supports: () => ({ supported: true, priority: 100 }), + runAttempt: vi.fn(), + runSideQuestion: codexSideQuestionMock, + }); + resolveModelWithRegistryMock.mockReturnValue({ + provider: "openai", + id: "gpt-5.5", + api: "openai-responses", + }); + resolveSessionAuthProfileOverrideMock.mockResolvedValue("openai-codex:work"); + + const result = await runSideQuestion({ + provider: "openai", + model: "gpt-5.5", + sessionKey: DEFAULT_SESSION_KEY, + }); + + expect(result).toEqual({ text: "Codex side answer." }); + expect(codexSideQuestionMock).toHaveBeenCalledTimes(1); + expect(codexSideQuestionMock.mock.calls[0]?.[0]).toMatchObject({ + provider: "openai", + model: "gpt-5.5", + question: DEFAULT_QUESTION, + sessionId: "session-1", + agentId: "main", + workspaceDir: "/tmp/workspace", + authProfileId: "openai-codex:work", + }); + expect(codexSideQuestionMock.mock.calls[0]?.[0].sessionFile).toContain("session-1.jsonl"); + expect(streamSimpleMock).not.toHaveBeenCalled(); + expect(registerProviderStreamForModelMock).not.toHaveBeenCalled(); + }); + + it("does not fall back to the direct provider call when Codex lacks BTW support", async () => { + registerAgentHarness({ + id: "codex", + label: "Codex test harness", + supports: () => ({ supported: true, priority: 100 }), + runAttempt: vi.fn(), + }); + + await expect( + runSideQuestion({ + provider: "openai", + model: "gpt-5.5", + sessionKey: DEFAULT_SESSION_KEY, + }), + ).rejects.toThrow('Selected agent harness "codex" does not support /btw side questions.'); + expect(streamSimpleMock).not.toHaveBeenCalled(); + expect(registerProviderStreamForModelMock).not.toHaveBeenCalled(); + }); + it("applies provider runtime auth before streaming github-copilot BTW questions", async () => { resolveModelWithRegistryMock.mockReturnValue({ provider: "github-copilot", diff --git a/src/agents/btw.ts b/src/agents/btw.ts index 7b10fb9af36..d3693eb49fe 100644 --- a/src/agents/btw.ts +++ b/src/agents/btw.ts @@ -17,7 +17,7 @@ import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "./agent-scope.js"; import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js"; import { readBtwTranscriptMessages, resolveBtwSessionTranscriptPath } from "./btw-transcript.js"; -import { resolveAgentHarnessPolicy } from "./harness/selection.js"; +import { resolveAgentHarnessPolicy, selectAgentHarness } from "./harness/selection.js"; import { resolveImageSanitizationLimits, type ImageSanitizationLimits, @@ -307,6 +307,47 @@ export async function runBtwSideQuestion( throw new Error("No active session transcript."); } + const sessionAgentId = resolveSessionAgentId({ + sessionKey: params.sessionKey, + config: params.cfg, + }); + const workspaceDir = resolveAgentWorkspaceDir(params.cfg, sessionAgentId); + const harness = selectAgentHarness({ + provider: params.provider, + modelId: params.model, + config: params.cfg, + agentId: sessionAgentId, + sessionKey: params.sessionKey, + }); + if (harness.runSideQuestion) { + const { authProfileId, authProfileIdSource } = await resolveRuntimeModel({ + cfg: params.cfg, + provider: params.provider, + model: params.model, + agentId: sessionAgentId, + agentDir: params.agentDir, + workspaceDir, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + isNewSession: params.isNewSession, + }); + const result = await harness.runSideQuestion({ + ...params, + sessionId, + sessionFile, + agentId: sessionAgentId, + workspaceDir, + authProfileId, + authProfileIdSource, + }); + return { text: result.text }; + } + if (harness.id !== "pi") { + throw new Error(`Selected agent harness "${harness.id}" does not support /btw side questions.`); + } + const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId); const imageLimits = resolveImageSanitizationLimits(params.cfg); let messages: Message[] = []; @@ -334,11 +375,6 @@ export async function runBtwSideQuestion( throw new Error("No active session context."); } - const sessionAgentId = resolveSessionAgentId({ - sessionKey: params.sessionKey, - config: params.cfg, - }); - const workspaceDir = resolveAgentWorkspaceDir(params.cfg, sessionAgentId); const { model, authProfileId } = await resolveRuntimeModel({ cfg: params.cfg, provider: params.provider, diff --git a/src/agents/harness/types.ts b/src/agents/harness/types.ts index 08a7f7b1061..4e29dd66dee 100644 --- a/src/agents/harness/types.ts +++ b/src/agents/harness/types.ts @@ -12,6 +12,32 @@ export type AgentHarnessAttemptParams = import("../pi-embedded-runner/run/types.js").EmbeddedRunAttemptParams; export type AgentHarnessAttemptResult = import("../pi-embedded-runner/run/types.js").EmbeddedRunAttemptResult; +export type AgentHarnessSideQuestionParams = { + cfg: import("../../config/types.openclaw.js").OpenClawConfig; + agentDir: string; + provider: string; + model: string; + question: string; + sessionEntry: import("../../config/sessions.js").SessionEntry; + sessionStore?: Record; + sessionKey?: string; + storePath?: string; + resolvedThinkLevel?: import("../../auto-reply/thinking.js").ThinkLevel; + resolvedReasoningLevel: import("../../auto-reply/thinking.js").ReasoningLevel; + blockReplyChunking?: import("../pi-embedded-block-chunker.js").BlockReplyChunking; + resolvedBlockStreamingBreak?: "text_end" | "message_end"; + opts?: import("../../auto-reply/get-reply-options.types.js").GetReplyOptions; + isNewSession: boolean; + sessionId: string; + sessionFile: string; + agentId?: string; + workspaceDir?: string; + authProfileId?: string; + authProfileIdSource?: "auto" | "user"; +}; +export type AgentHarnessSideQuestionResult = { + text: string; +}; export type AgentHarnessCompactParams = import("../pi-embedded-runner/compact.types.js").CompactEmbeddedPiSessionParams; export type AgentHarnessCompactResult = @@ -42,6 +68,7 @@ export type AgentHarness = { deliveryDefaults?: AgentHarnessDeliveryDefaults; supports(ctx: AgentHarnessSupportContext): AgentHarnessSupport; runAttempt(params: AgentHarnessAttemptParams): Promise; + runSideQuestion?(params: AgentHarnessSideQuestionParams): Promise; classify?( result: AgentHarnessAttemptResult, ctx: AgentHarnessAttemptParams, diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index 743cc05f4a5..99f84e92c29 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -25,6 +25,8 @@ export type { AgentHarnessCompactResult, AgentHarnessDeliveryDefaults, AgentHarnessResultClassification, + AgentHarnessSideQuestionParams, + AgentHarnessSideQuestionResult, AgentHarnessResetParams, AgentHarnessSupport, AgentHarnessSupportContext,