diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index ba74ef87e9b..fbf9f8f7597 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -1,7 +1,15 @@ -import type { Api, Model } from "@mariozechner/pi-ai"; import type { EmbeddedRunAttemptParams } from "openclaw/plugin-sdk/agent-harness"; import { describe, expect, it, vi } from "vitest"; -import { CodexAppServerEventProjector } from "./event-projector.js"; +import { + CodexAppServerEventProjector, + type CodexAppServerToolTelemetry, +} from "./event-projector.js"; +import { createCodexTestModel } from "./test-support.js"; + +const THREAD_ID = "thread-1"; +const TURN_ID = "turn-1"; + +type ProjectorNotification = Parameters[0]; function createParams(): EmbeddedRunAttemptParams { return { @@ -9,45 +17,67 @@ function createParams(): EmbeddedRunAttemptParams { sessionId: "session-1", provider: "openai-codex", modelId: "gpt-5.4-codex", - model: { - id: "gpt-5.4-codex", - name: "gpt-5.4-codex", - provider: "openai-codex", - api: "openai-codex-responses", - input: ["text"], - reasoning: true, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: 128_000, - maxTokens: 8_000, - } as Model, + model: createCodexTestModel(), thinkLevel: "medium", } as unknown as EmbeddedRunAttemptParams; } -describe("CodexAppServerEventProjector", () => { - it("projects assistant deltas and usage into embedded attempt results", async () => { - const onAssistantMessageStart = vi.fn(); - const onPartialReply = vi.fn(); - const params = { +function createProjector(params = createParams()): CodexAppServerEventProjector { + return new CodexAppServerEventProjector(params, THREAD_ID, TURN_ID); +} + +function createProjectorWithAssistantHooks() { + const onAssistantMessageStart = vi.fn(); + const onPartialReply = vi.fn(); + return { + onAssistantMessageStart, + onPartialReply, + projector: createProjector({ ...createParams(), onAssistantMessageStart, onPartialReply, - }; - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + }), + }; +} - await projector.handleNotification({ - method: "item/agentMessage/delta", - params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "hel" }, - }); - await projector.handleNotification({ - method: "item/agentMessage/delta", - params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "lo" }, - }); - await projector.handleNotification({ - method: "thread/tokenUsage/updated", - params: { - threadId: "thread-1", - turnId: "turn-1", +function buildEmptyToolTelemetry(): CodexAppServerToolTelemetry { + return { + didSendViaMessagingTool: false, + messagingToolSentTexts: [], + messagingToolSentMediaUrls: [], + messagingToolSentTargets: [], + }; +} + +function forCurrentTurn( + method: ProjectorNotification["method"], + params: Record, +): ProjectorNotification { + return { + method, + params: { threadId: THREAD_ID, turnId: TURN_ID, ...params }, + } as ProjectorNotification; +} + +function agentMessageDelta(delta: string, itemId = "msg-1"): ProjectorNotification { + return forCurrentTurn("item/agentMessage/delta", { itemId, delta }); +} + +function turnCompleted(items: unknown[] = []): ProjectorNotification { + return forCurrentTurn("turn/completed", { + turn: { id: TURN_ID, status: "completed", items }, + }); +} + +describe("CodexAppServerEventProjector", () => { + it("projects assistant deltas and usage into embedded attempt results", async () => { + const { onAssistantMessageStart, onPartialReply, projector } = + createProjectorWithAssistantHooks(); + + await projector.handleNotification(agentMessageDelta("hel")); + await projector.handleNotification(agentMessageDelta("lo")); + await projector.handleNotification( + forCurrentTurn("thread/tokenUsage/updated", { tokenUsage: { total: { totalTokens: 900_000, @@ -62,27 +92,13 @@ describe("CodexAppServerEventProjector", () => { outputTokens: 7, }, }, - }, - }); - await projector.handleNotification({ - method: "turn/completed", - params: { - threadId: "thread-1", - turnId: "turn-1", - turn: { - id: "turn-1", - status: "completed", - items: [{ type: "agentMessage", id: "msg-1", text: "hello" }], - }, - }, - }); + }), + ); + await projector.handleNotification( + turnCompleted([{ type: "agentMessage", id: "msg-1", text: "hello" }]), + ); - const result = projector.buildResult({ - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }); + const result = projector.buildResult(buildEmptyToolTelemetry()); expect(onAssistantMessageStart).toHaveBeenCalledTimes(1); expect(onPartialReply).not.toHaveBeenCalled(); @@ -100,18 +116,11 @@ describe("CodexAppServerEventProjector", () => { }); it("does not treat cumulative-only token usage as fresh context usage", async () => { - const params = createParams(); - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + const projector = createProjector(); - await projector.handleNotification({ - method: "item/agentMessage/delta", - params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "done" }, - }); - await projector.handleNotification({ - method: "thread/tokenUsage/updated", - params: { - threadId: "thread-1", - turnId: "turn-1", + await projector.handleNotification(agentMessageDelta("done")); + await projector.handleNotification( + forCurrentTurn("thread/tokenUsage/updated", { tokenUsage: { total: { totalTokens: 1_000_000, @@ -120,15 +129,10 @@ describe("CodexAppServerEventProjector", () => { outputTokens: 500, }, }, - }, - }); + }), + ); - const result = projector.buildResult({ - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }); + const result = projector.buildResult(buildEmptyToolTelemetry()); expect(result.assistantTexts).toEqual(["done"]); expect(result.attemptUsage).toBeUndefined(); @@ -141,18 +145,11 @@ describe("CodexAppServerEventProjector", () => { }); it("normalizes snake_case current token usage fields", async () => { - const params = createParams(); - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + const projector = createProjector(); - await projector.handleNotification({ - method: "item/agentMessage/delta", - params: { threadId: "thread-1", turnId: "turn-1", itemId: "msg-1", delta: "done" }, - }); - await projector.handleNotification({ - method: "thread/tokenUsage/updated", - params: { - threadId: "thread-1", - turnId: "turn-1", + await projector.handleNotification(agentMessageDelta("done")); + await projector.handleNotification( + forCurrentTurn("thread/tokenUsage/updated", { tokenUsage: { total: { total_tokens: 1_000_000 }, last_token_usage: { @@ -162,15 +159,10 @@ describe("CodexAppServerEventProjector", () => { output_tokens: 9, }, }, - }, - }); + }), + ); - const result = projector.buildResult({ - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }); + const result = projector.buildResult(buildEmptyToolTelemetry()); expect(result.attemptUsage).toMatchObject({ input: 8, output: 9, cacheRead: 3, total: 20 }); expect(result.lastAssistant?.usage).toMatchObject({ @@ -182,63 +174,37 @@ describe("CodexAppServerEventProjector", () => { }); it("keeps intermediate agentMessage items out of the final visible reply", async () => { - const onAssistantMessageStart = vi.fn(); - const onPartialReply = vi.fn(); - const params = { - ...createParams(), - onAssistantMessageStart, - onPartialReply, - }; - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + const { onAssistantMessageStart, onPartialReply, projector } = + createProjectorWithAssistantHooks(); - await projector.handleNotification({ - method: "item/agentMessage/delta", - params: { - threadId: "thread-1", - turnId: "turn-1", - itemId: "msg-commentary", - delta: "checking thread context; then post a tight progress reply here.", - }, - }); - await projector.handleNotification({ - method: "item/agentMessage/delta", - params: { - threadId: "thread-1", - turnId: "turn-1", - itemId: "msg-final", - delta: "release fixes first. please drop affected PRs, failing checks, and blockers here.", - }, - }); - await projector.handleNotification({ - method: "turn/completed", - params: { - threadId: "thread-1", - turnId: "turn-1", - turn: { - id: "turn-1", - status: "completed", - items: [ - { - type: "agentMessage", - id: "msg-commentary", - text: "checking thread context; then post a tight progress reply here.", - }, - { - type: "agentMessage", - id: "msg-final", - text: "release fixes first. please drop affected PRs, failing checks, and blockers here.", - }, - ], + await projector.handleNotification( + agentMessageDelta( + "checking thread context; then post a tight progress reply here.", + "msg-commentary", + ), + ); + await projector.handleNotification( + agentMessageDelta( + "release fixes first. please drop affected PRs, failing checks, and blockers here.", + "msg-final", + ), + ); + await projector.handleNotification( + turnCompleted([ + { + type: "agentMessage", + id: "msg-commentary", + text: "checking thread context; then post a tight progress reply here.", }, - }, - }); + { + type: "agentMessage", + id: "msg-final", + text: "release fixes first. please drop affected PRs, failing checks, and blockers here.", + }, + ]), + ); - const result = projector.buildResult({ - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }); + const result = projector.buildResult(buildEmptyToolTelemetry()); expect(onAssistantMessageStart).toHaveBeenCalledTimes(1); expect(onPartialReply).not.toHaveBeenCalled(); @@ -255,36 +221,21 @@ describe("CodexAppServerEventProjector", () => { }); it("ignores notifications for other turns", async () => { - const params = createParams(); - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + const projector = createProjector(); await projector.handleNotification({ method: "item/agentMessage/delta", - params: { threadId: "thread-1", turnId: "turn-2", itemId: "msg-1", delta: "wrong" }, + params: { threadId: THREAD_ID, turnId: "turn-2", itemId: "msg-1", delta: "wrong" }, }); - const result = projector.buildResult({ - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }); + const result = projector.buildResult(buildEmptyToolTelemetry()); expect(result.assistantTexts).toEqual([]); }); it("preserves sessions_yield detection in attempt results", () => { - const params = createParams(); - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + const projector = createProjector(); - const result = projector.buildResult( - { - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }, - { yieldDetected: true }, - ); + const result = projector.buildResult(buildEmptyToolTelemetry(), { yieldDetected: true }); expect(result.yieldDetected).toBe(true); }); @@ -299,71 +250,45 @@ describe("CodexAppServerEventProjector", () => { onReasoningEnd, onAgentEvent, }; - const projector = new CodexAppServerEventProjector(params, "thread-1", "turn-1"); + const projector = createProjector(params); - await projector.handleNotification({ - method: "item/reasoning/textDelta", - params: { threadId: "thread-1", turnId: "turn-1", itemId: "reason-1", delta: "thinking" }, - }); - await projector.handleNotification({ - method: "item/plan/delta", - params: { threadId: "thread-1", turnId: "turn-1", itemId: "plan-1", delta: "- inspect\n" }, - }); - await projector.handleNotification({ - method: "turn/plan/updated", - params: { - threadId: "thread-1", - turnId: "turn-1", + await projector.handleNotification( + forCurrentTurn("item/reasoning/textDelta", { itemId: "reason-1", delta: "thinking" }), + ); + await projector.handleNotification( + forCurrentTurn("item/plan/delta", { itemId: "plan-1", delta: "- inspect\n" }), + ); + await projector.handleNotification( + forCurrentTurn("turn/plan/updated", { explanation: "next", plan: [{ step: "patch", status: "in_progress" }], - }, - }); - await projector.handleNotification({ - method: "item/started", - params: { - threadId: "thread-1", - turnId: "turn-1", + }), + ); + await projector.handleNotification( + forCurrentTurn("item/started", { item: { type: "contextCompaction", id: "compact-1" }, - }, - }); + }), + ); expect(projector.isCompacting()).toBe(true); - await projector.handleNotification({ - method: "item/completed", - params: { - threadId: "thread-1", - turnId: "turn-1", + await projector.handleNotification( + forCurrentTurn("item/completed", { item: { type: "contextCompaction", id: "compact-1" }, - }, - }); + }), + ); expect(projector.isCompacting()).toBe(false); - await projector.handleNotification({ - method: "item/completed", - params: { - threadId: "thread-1", - turnId: "turn-1", + await projector.handleNotification( + forCurrentTurn("item/completed", { item: { type: "dynamicToolCall", id: "tool-1", tool: "sessions_send", status: "completed", }, - }, - }); - await projector.handleNotification({ - method: "turn/completed", - params: { - threadId: "thread-1", - turnId: "turn-1", - turn: { id: "turn-1", status: "completed", items: [] }, - }, - }); + }), + ); + await projector.handleNotification(turnCompleted()); - const result = projector.buildResult({ - didSendViaMessagingTool: false, - messagingToolSentTexts: [], - messagingToolSentMediaUrls: [], - messagingToolSentTargets: [], - }); + const result = projector.buildResult(buildEmptyToolTelemetry()); expect(onReasoningStream).toHaveBeenCalledWith({ text: "thinking" }); expect(onReasoningEnd).toHaveBeenCalledTimes(1); diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index c1da7a97c1a..91e5e61811b 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -1,7 +1,6 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import type { Api, Model } from "@mariozechner/pi-ai"; import { abortAgentHarnessRun, queueAgentHarnessMessage, @@ -11,6 +10,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { CodexServerNotification } from "./protocol.js"; import { runCodexAppServerAttempt, __testing } from "./run-attempt.js"; import { writeCodexAppServerBinding } from "./session-binding.js"; +import { createCodexTestModel } from "./test-support.js"; import { buildThreadResumeParams, buildTurnStartParams, @@ -29,17 +29,7 @@ function createParams(sessionFile: string, workspaceDir: string): EmbeddedRunAtt runId: "run-1", provider: "codex", modelId: "gpt-5.4-codex", - model: { - id: "gpt-5.4-codex", - name: "gpt-5.4-codex", - provider: "codex", - api: "openai-codex-responses", - input: ["text"], - reasoning: true, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: 128_000, - maxTokens: 8_000, - } as Model, + model: createCodexTestModel("codex"), thinkLevel: "medium", disableTools: true, timeoutMs: 5_000, @@ -253,10 +243,7 @@ describe("runCodexAppServerAttempt", () => { path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace"), ); - params.model = { - ...params.model, - input: ["text", "image"], - } as Model; + params.model = createCodexTestModel("codex", ["text", "image"]); params.images = [ { type: "image", diff --git a/extensions/codex/src/app-server/test-support.ts b/extensions/codex/src/app-server/test-support.ts index acf3f47704b..15c56445549 100644 --- a/extensions/codex/src/app-server/test-support.ts +++ b/extensions/codex/src/app-server/test-support.ts @@ -1,8 +1,23 @@ import { EventEmitter } from "node:events"; import { PassThrough, Writable } from "node:stream"; +import type { Api, Model } from "@mariozechner/pi-ai"; import { vi } from "vitest"; import { CodexAppServerClient } from "./client.js"; +export function createCodexTestModel(provider = "openai-codex", input = ["text"]): Model { + return { + id: "gpt-5.4-codex", + name: "gpt-5.4-codex", + provider, + api: "openai-codex-responses", + input, + reasoning: true, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128_000, + maxTokens: 8_000, + } as Model; +} + export function createClientHarness() { const stdout = new PassThrough(); const writes: string[] = [];