Files
openclaw/extensions/codex/app-server/event-projector.ts
2026-04-10 21:22:16 +01:00

608 lines
19 KiB
TypeScript

import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, Usage } from "@mariozechner/pi-ai";
import {
formatErrorMessage,
normalizeUsage,
type NormalizedUsage,
type EmbeddedRunAttemptParams,
type EmbeddedRunAttemptResult,
type MessagingToolSend,
} from "openclaw/plugin-sdk/agent-harness";
import {
isJsonObject,
type CodexServerNotification,
type CodexThreadItem,
type CodexTurn,
type JsonObject,
type JsonValue,
} from "./protocol.js";
export type CodexAppServerToolTelemetry = {
didSendViaMessagingTool: boolean;
messagingToolSentTexts: string[];
messagingToolSentMediaUrls: string[];
messagingToolSentTargets: MessagingToolSend[];
toolMediaUrls?: string[];
toolAudioAsVoice?: boolean;
successfulCronAdds?: number;
};
const ZERO_USAGE: Usage = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
};
export class CodexAppServerEventProjector {
private readonly assistantTextByItem = new Map<string, string>();
private readonly reasoningTextByItem = new Map<string, string>();
private readonly planTextByItem = new Map<string, string>();
private readonly activeItemIds = new Set<string>();
private readonly completedItemIds = new Set<string>();
private readonly activeCompactionItemIds = new Set<string>();
private readonly toolMetas = new Map<string, { toolName: string; meta?: string }>();
private assistantStarted = false;
private reasoningStarted = false;
private reasoningEnded = false;
private completedTurn: CodexTurn | undefined;
private promptError: unknown;
private promptErrorSource: EmbeddedRunAttemptResult["promptErrorSource"] = null;
private aborted = false;
private tokenUsage: NormalizedUsage | undefined;
private guardianReviewCount = 0;
private completedCompactionCount = 0;
constructor(
private readonly params: EmbeddedRunAttemptParams,
private readonly threadId: string,
private readonly turnId: string,
) {}
async handleNotification(notification: CodexServerNotification): Promise<void> {
const params = isJsonObject(notification.params) ? notification.params : undefined;
if (!params || !this.isNotificationForTurn(params)) {
return;
}
switch (notification.method) {
case "item/agentMessage/delta":
await this.handleAssistantDelta(params);
break;
case "item/reasoning/summaryTextDelta":
case "item/reasoning/textDelta":
await this.handleReasoningDelta(params);
break;
case "item/plan/delta":
this.handlePlanDelta(params);
break;
case "turn/plan/updated":
this.handleTurnPlanUpdated(params);
break;
case "item/started":
this.handleItemStarted(params);
break;
case "item/completed":
this.handleItemCompleted(params);
break;
case "item/autoApprovalReview/started":
case "item/autoApprovalReview/completed":
this.guardianReviewCount += 1;
this.params.onAgentEvent?.({
stream: "codex_app_server.guardian",
data: { method: notification.method },
});
break;
case "thread/tokenUsage/updated":
this.handleTokenUsage(params);
break;
case "turn/completed":
await this.handleTurnCompleted(params);
break;
case "error":
this.promptError = readString(params, "message") ?? "codex app-server error";
this.promptErrorSource = "prompt";
break;
default:
break;
}
}
buildResult(
toolTelemetry: CodexAppServerToolTelemetry,
options?: { yieldDetected?: boolean },
): EmbeddedRunAttemptResult {
const assistantTexts = this.collectAssistantTexts();
const lastAssistant =
assistantTexts.length > 0
? this.createAssistantMessage(assistantTexts.join("\n\n"))
: undefined;
const messagesSnapshot: AgentMessage[] = [
{
role: "user",
content: this.params.prompt,
timestamp: Date.now(),
},
];
if (lastAssistant) {
messagesSnapshot.push(lastAssistant);
}
const turnFailed = this.completedTurn?.status === "failed";
const turnInterrupted = this.completedTurn?.status === "interrupted";
const promptError =
this.promptError ??
(turnFailed ? (this.completedTurn?.error?.message ?? "codex app-server turn failed") : null);
return {
aborted: this.aborted || turnInterrupted,
timedOut: false,
idleTimedOut: false,
timedOutDuringCompaction: false,
promptError,
promptErrorSource: promptError ? this.promptErrorSource || "prompt" : null,
sessionIdUsed: this.params.sessionId,
bootstrapPromptWarningSignaturesSeen: this.params.bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature: this.params.bootstrapPromptWarningSignature,
messagesSnapshot,
assistantTexts,
toolMetas: [...this.toolMetas.values()],
lastAssistant,
didSendViaMessagingTool: toolTelemetry.didSendViaMessagingTool,
messagingToolSentTexts: toolTelemetry.messagingToolSentTexts,
messagingToolSentMediaUrls: toolTelemetry.messagingToolSentMediaUrls,
messagingToolSentTargets: toolTelemetry.messagingToolSentTargets,
toolMediaUrls: toolTelemetry.toolMediaUrls,
toolAudioAsVoice: toolTelemetry.toolAudioAsVoice,
successfulCronAdds: toolTelemetry.successfulCronAdds,
cloudCodeAssistFormatError: false,
attemptUsage: this.tokenUsage,
replayMetadata: {
hadPotentialSideEffects: toolTelemetry.didSendViaMessagingTool,
replaySafe: !toolTelemetry.didSendViaMessagingTool,
},
itemLifecycle: {
startedCount: this.activeItemIds.size + this.completedItemIds.size,
completedCount: this.completedItemIds.size,
activeCount: this.activeItemIds.size,
...(this.completedCompactionCount > 0
? { compactionCount: this.completedCompactionCount }
: {}),
},
yieldDetected: options?.yieldDetected || false,
didSendDeterministicApprovalPrompt: this.guardianReviewCount > 0 ? false : undefined,
};
}
markTimedOut(): void {
this.aborted = true;
this.promptError = "codex app-server attempt timed out";
this.promptErrorSource = "prompt";
}
isCompacting(): boolean {
return this.activeCompactionItemIds.size > 0;
}
private async handleAssistantDelta(params: JsonObject): Promise<void> {
const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "assistant";
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
if (!this.assistantStarted) {
this.assistantStarted = true;
await this.params.onAssistantMessageStart?.();
}
const text = `${this.assistantTextByItem.get(itemId) ?? ""}${delta}`;
this.assistantTextByItem.set(itemId, text);
await this.params.onPartialReply?.({ text });
}
private async handleReasoningDelta(params: JsonObject): Promise<void> {
const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "reasoning";
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
this.reasoningStarted = true;
this.reasoningTextByItem.set(itemId, `${this.reasoningTextByItem.get(itemId) ?? ""}${delta}`);
await this.params.onReasoningStream?.({ text: delta });
}
private handlePlanDelta(params: JsonObject): void {
const itemId = readString(params, "itemId") ?? readString(params, "id") ?? "plan";
const delta = readString(params, "delta") ?? "";
if (!delta) {
return;
}
const text = `${this.planTextByItem.get(itemId) ?? ""}${delta}`;
this.planTextByItem.set(itemId, text);
this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(text) });
}
private handleTurnPlanUpdated(params: JsonObject): void {
const plan = Array.isArray(params.plan)
? params.plan.flatMap((entry) => {
if (!isJsonObject(entry)) {
return [];
}
const step = readString(entry, "step");
const status = readString(entry, "status");
if (!step) {
return [];
}
return status ? [`${step} (${status})`] : [step];
})
: undefined;
this.emitPlanUpdate({
explanation: readNullableString(params, "explanation"),
steps: plan,
});
}
private handleItemStarted(params: JsonObject): void {
const item = readItem(params.item);
const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id");
if (itemId) {
this.activeItemIds.add(itemId);
}
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.add(itemId);
this.params.onAgentEvent?.({
stream: "compaction",
data: {
phase: "start",
backend: "codex-app-server",
threadId: this.threadId,
turnId: this.turnId,
itemId,
},
});
}
this.emitStandardItemEvent({ phase: "start", item });
this.params.onAgentEvent?.({
stream: "codex_app_server.item",
data: { phase: "started", itemId, type: item?.type },
});
}
private handleItemCompleted(params: JsonObject): void {
const item = readItem(params.item);
const itemId = item?.id ?? readString(params, "itemId") ?? readString(params, "id");
if (itemId) {
this.activeItemIds.delete(itemId);
this.completedItemIds.add(itemId);
}
if (item?.type === "agentMessage" && typeof item.text === "string" && item.text) {
this.assistantTextByItem.set(item.id, item.text);
}
if (item?.type === "plan" && typeof item.text === "string" && item.text) {
this.planTextByItem.set(item.id, item.text);
this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(item.text) });
}
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.delete(itemId);
this.completedCompactionCount += 1;
this.params.onAgentEvent?.({
stream: "compaction",
data: {
phase: "end",
backend: "codex-app-server",
threadId: this.threadId,
turnId: this.turnId,
itemId,
},
});
}
this.recordToolMeta(item);
this.emitStandardItemEvent({ phase: "end", item });
this.params.onAgentEvent?.({
stream: "codex_app_server.item",
data: { phase: "completed", itemId, type: item?.type },
});
}
private handleTokenUsage(params: JsonObject): void {
const tokenUsage = isJsonObject(params.tokenUsage) ? params.tokenUsage : undefined;
const total = tokenUsage && isJsonObject(tokenUsage.total) ? tokenUsage.total : undefined;
if (!total) {
return;
}
this.tokenUsage = normalizeUsage({
input: readNumber(total, "inputTokens"),
output: readNumber(total, "outputTokens"),
cacheRead: readNumber(total, "cachedInputTokens"),
total: readNumber(total, "totalTokens"),
});
}
private async handleTurnCompleted(params: JsonObject): Promise<void> {
const turn = readTurn(params.turn);
if (!turn || turn.id !== this.turnId) {
return;
}
this.completedTurn = turn;
if (turn.status === "interrupted") {
this.aborted = true;
}
if (turn.status === "failed") {
this.promptError = turn.error?.message ?? "codex app-server turn failed";
this.promptErrorSource = "prompt";
}
for (const item of turn.items ?? []) {
if (item.type === "agentMessage" && typeof item.text === "string" && item.text) {
this.assistantTextByItem.set(item.id, item.text);
}
if (item.type === "plan" && typeof item.text === "string" && item.text) {
this.planTextByItem.set(item.id, item.text);
this.emitPlanUpdate({ explanation: undefined, steps: splitPlanText(item.text) });
}
this.recordToolMeta(item);
}
this.activeCompactionItemIds.clear();
await this.maybeEndReasoning();
}
private async maybeEndReasoning(): Promise<void> {
if (!this.reasoningStarted || this.reasoningEnded) {
return;
}
this.reasoningEnded = true;
await this.params.onReasoningEnd?.();
}
private emitPlanUpdate(params: { explanation?: string | null; steps?: string[] }): void {
if (!params.explanation && (!params.steps || params.steps.length === 0)) {
return;
}
this.params.onAgentEvent?.({
stream: "plan",
data: {
phase: "update",
title: "Plan updated",
source: "codex-app-server",
...(params.explanation ? { explanation: params.explanation } : {}),
...(params.steps && params.steps.length > 0 ? { steps: params.steps } : {}),
},
});
}
private emitStandardItemEvent(params: {
phase: "start" | "end";
item: CodexThreadItem | undefined;
}): void {
const { item } = params;
if (!item) {
return;
}
const kind = itemKind(item);
if (!kind) {
return;
}
this.params.onAgentEvent?.({
stream: "item",
data: {
itemId: item.id,
phase: params.phase,
kind,
title: itemTitle(item),
status: params.phase === "start" ? "running" : itemStatus(item),
...(itemName(item) ? { name: itemName(item) } : {}),
...(itemMeta(item) ? { meta: itemMeta(item) } : {}),
},
});
}
private recordToolMeta(item: CodexThreadItem | undefined): void {
if (!item) {
return;
}
const toolName = itemName(item);
if (!toolName) {
return;
}
this.toolMetas.set(item.id, {
toolName,
...(itemMeta(item) ? { meta: itemMeta(item) } : {}),
});
}
private collectAssistantTexts(): string[] {
return [...this.assistantTextByItem.values()].filter((text) => text.trim().length > 0);
}
private createAssistantMessage(text: string): AssistantMessage {
const usage: Usage = this.tokenUsage
? {
input: this.tokenUsage.input ?? 0,
output: this.tokenUsage.output ?? 0,
cacheRead: this.tokenUsage.cacheRead ?? 0,
cacheWrite: this.tokenUsage.cacheWrite ?? 0,
totalTokens:
this.tokenUsage.total ??
(this.tokenUsage.input ?? 0) +
(this.tokenUsage.output ?? 0) +
(this.tokenUsage.cacheRead ?? 0) +
(this.tokenUsage.cacheWrite ?? 0),
cost: ZERO_USAGE.cost,
}
: ZERO_USAGE;
return {
role: "assistant",
content: [{ type: "text", text }],
api: this.params.model.api ?? "openai-codex-responses",
provider: this.params.provider,
model: this.params.modelId,
usage,
stopReason: this.aborted ? "aborted" : this.promptError ? "error" : "stop",
errorMessage: this.promptError ? formatErrorMessage(this.promptError) : undefined,
timestamp: Date.now(),
};
}
private isNotificationForTurn(params: JsonObject): boolean {
const threadId = readString(params, "threadId");
const turnId = readString(params, "turnId");
return (!threadId || threadId === this.threadId) && (!turnId || turnId === this.turnId);
}
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}
function readNullableString(record: JsonObject, key: string): string | null | undefined {
const value = record[key];
if (value === null) {
return null;
}
return typeof value === "string" ? value : undefined;
}
function readNumber(record: JsonObject, key: string): number | undefined {
const value = record[key];
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function splitPlanText(text: string): string[] {
return text
.split(/\r?\n/)
.map((line) => line.trim().replace(/^[-*]\s+/, ""))
.filter((line) => line.length > 0);
}
function itemKind(
item: CodexThreadItem,
): "tool" | "command" | "patch" | "search" | "analysis" | undefined {
switch (item.type) {
case "dynamicToolCall":
case "mcpToolCall":
return "tool";
case "commandExecution":
return "command";
case "fileChange":
return "patch";
case "webSearch":
return "search";
case "reasoning":
case "contextCompaction":
return "analysis";
default:
return undefined;
}
}
function itemTitle(item: CodexThreadItem): string {
switch (item.type) {
case "commandExecution":
return "Command";
case "fileChange":
return "File change";
case "mcpToolCall":
return "MCP tool";
case "dynamicToolCall":
return "Tool";
case "webSearch":
return "Web search";
case "contextCompaction":
return "Context compaction";
case "reasoning":
return "Reasoning";
default:
return item.type;
}
}
function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" {
const status = readItemString(item, "status");
if (status === "failed") {
return "failed";
}
if (status === "inProgress" || status === "running") {
return "running";
}
return "completed";
}
function itemName(item: CodexThreadItem): string | undefined {
if (item.type === "dynamicToolCall" && typeof item.tool === "string") {
return item.tool;
}
if (item.type === "mcpToolCall" && typeof item.tool === "string") {
const server = typeof item.server === "string" ? item.server : undefined;
return server ? `${server}.${item.tool}` : item.tool;
}
if (item.type === "commandExecution") {
return "bash";
}
if (item.type === "fileChange") {
return "apply_patch";
}
if (item.type === "webSearch") {
return "web_search";
}
return undefined;
}
function itemMeta(item: CodexThreadItem): string | undefined {
if (item.type === "commandExecution" && typeof item.command === "string") {
return item.command;
}
if (item.type === "webSearch" && typeof item.query === "string") {
return item.query;
}
return readItemString(item, "status");
}
function readItemString(item: CodexThreadItem, key: string): string | undefined {
const value = (item as Record<string, unknown>)[key];
return typeof value === "string" ? value : undefined;
}
function readItem(value: JsonValue | undefined): CodexThreadItem | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const type = typeof value.type === "string" ? value.type : undefined;
const id = typeof value.id === "string" ? value.id : undefined;
if (!type || !id) {
return undefined;
}
return value as CodexThreadItem;
}
function readTurn(value: JsonValue | undefined): CodexTurn | undefined {
if (!isJsonObject(value)) {
return undefined;
}
const id = typeof value.id === "string" ? value.id : undefined;
const status = typeof value.status === "string" ? value.status : undefined;
if (!id || !status) {
return undefined;
}
const items = Array.isArray(value.items)
? value.items.flatMap((item) => {
const parsed = readItem(item);
return parsed ? [parsed] : [];
})
: undefined;
return {
id,
status: status as CodexTurn["status"],
error: isJsonObject(value.error)
? {
message: typeof value.error.message === "string" ? value.error.message : undefined,
}
: null,
items,
};
}