fix: keep Codex projector events isolated (#69072) (thanks @ayeshakhalid192007-dev)

This commit is contained in:
Peter Steinberger
2026-04-20 23:42:25 +01:00
parent f2f27775fb
commit 25428c4631
2 changed files with 47 additions and 7 deletions

View File

@@ -314,4 +314,34 @@ describe("CodexAppServerEventProjector", () => {
expect(JSON.stringify(result.messagesSnapshot[2])).toContain("Codex plan");
expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 });
});
it("continues projecting turn completion when an event consumer throws", async () => {
const onAgentEvent = vi.fn(() => {
throw new Error("consumer failed");
});
const projector = createProjector({
...createParams(),
onAgentEvent,
});
await expect(
projector.handleNotification(
turnCompleted([
{ type: "plan", id: "plan-1", text: "step one\nstep two" },
{ type: "agentMessage", id: "msg-1", text: "final answer" },
]),
),
).resolves.toBeUndefined();
const result = projector.buildResult(buildEmptyToolTelemetry());
expect(onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "plan",
data: expect.objectContaining({ steps: ["step one", "step two"] }),
}),
);
expect(result.assistantTexts).toEqual(["final answer"]);
expect(JSON.stringify(result.messagesSnapshot)).toContain("Codex plan");
});
});

View File

@@ -106,7 +106,7 @@ export class CodexAppServerEventProjector {
case "item/autoApprovalReview/started":
case "item/autoApprovalReview/completed":
this.guardianReviewCount += 1;
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "codex_app_server.guardian",
data: { method: notification.method },
});
@@ -279,7 +279,7 @@ export class CodexAppServerEventProjector {
}
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.add(itemId);
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "compaction",
data: {
phase: "start",
@@ -291,7 +291,7 @@ export class CodexAppServerEventProjector {
});
}
this.emitStandardItemEvent({ phase: "start", item });
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "codex_app_server.item",
data: { phase: "started", itemId, type: item?.type },
});
@@ -315,7 +315,7 @@ export class CodexAppServerEventProjector {
if (item?.type === "contextCompaction" && itemId) {
this.activeCompactionItemIds.delete(itemId);
this.completedCompactionCount += 1;
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "compaction",
data: {
phase: "end",
@@ -328,7 +328,7 @@ export class CodexAppServerEventProjector {
}
this.recordToolMeta(item);
this.emitStandardItemEvent({ phase: "end", item });
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "codex_app_server.item",
data: { phase: "completed", itemId, type: item?.type },
});
@@ -388,7 +388,7 @@ export class CodexAppServerEventProjector {
if (!params.explanation && (!params.steps || params.steps.length === 0)) {
return;
}
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "plan",
data: {
phase: "update",
@@ -412,7 +412,7 @@ export class CodexAppServerEventProjector {
if (!kind) {
return;
}
this.params.onAgentEvent?.({
this.emitAgentEvent({
stream: "item",
data: {
itemId: item.id,
@@ -440,6 +440,16 @@ export class CodexAppServerEventProjector {
});
}
private emitAgentEvent(
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
): void {
try {
this.params.onAgentEvent?.(event);
} catch {
// Downstream event consumers must not corrupt the canonical Codex turn projection.
}
}
private collectAssistantTexts(): string[] {
const finalText = this.resolveFinalAssistantText();
return finalText ? [finalText] : [];