diff --git a/extensions/acpx/src/runtime-internals/events.test.ts b/extensions/acpx/src/runtime-internals/events.test.ts index 018d1a3c3c7..0736ba4063c 100644 --- a/extensions/acpx/src/runtime-internals/events.test.ts +++ b/extensions/acpx/src/runtime-internals/events.test.ts @@ -5,9 +5,24 @@ function jsonLine(payload: unknown): string { return JSON.stringify(payload); } +function beginPrompt(projector: PromptStreamProjector, id = "req-1") { + projector.ingestLine( + jsonLine({ + jsonrpc: "2.0", + id, + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hello" }], + }, + }), + ); +} + describe("PromptStreamProjector", () => { it("maps agent message chunks to output deltas", () => { const projector = new PromptStreamProjector(); + beginPrompt(projector); const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -34,6 +49,7 @@ describe("PromptStreamProjector", () => { it("preserves leading spaces in streamed output chunks", () => { const projector = new PromptStreamProjector(); + beginPrompt(projector); const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -60,6 +76,7 @@ describe("PromptStreamProjector", () => { it("maps agent thought chunks to thought deltas", () => { const projector = new PromptStreamProjector(); + beginPrompt(projector); const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -86,6 +103,7 @@ describe("PromptStreamProjector", () => { it("maps tool call updates to tool_call events", () => { const projector = new PromptStreamProjector(); + beginPrompt(projector); const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -108,19 +126,53 @@ describe("PromptStreamProjector", () => { }); }); - it("maps prompt response stop reasons to done events", () => { + it("ignores replayed updates before current prompt starts", () => { const projector = new PromptStreamProjector(); - projector.ingestLine( + const replayed = projector.ingestLine( jsonLine({ jsonrpc: "2.0", - id: "req-1", - method: "session/prompt", + method: "session/update", params: { sessionId: "session-1", - prompt: [{ type: "text", text: "hello" }], + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "old turn", + }, + }, }, }), ); + beginPrompt(projector, "req-2"); + const current = projector.ingestLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "new turn", + }, + }, + }, + }), + ); + + expect(replayed).toBeNull(); + expect(current).toEqual({ + type: "text_delta", + text: "new turn", + stream: "output", + }); + }); + + it("maps prompt response stop reasons to done events", () => { + const projector = new PromptStreamProjector(); + beginPrompt(projector); const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -139,17 +191,7 @@ describe("PromptStreamProjector", () => { it("maps json-rpc errors to runtime errors", () => { const projector = new PromptStreamProjector(); - projector.ingestLine( - jsonLine({ - jsonrpc: "2.0", - id: "req-1", - method: "session/prompt", - params: { - sessionId: "session-1", - prompt: [{ type: "text", text: "hello" }], - }, - }), - ); + beginPrompt(projector); const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -170,17 +212,7 @@ describe("PromptStreamProjector", () => { it("ignores non-prompt response errors", () => { const projector = new PromptStreamProjector(); - projector.ingestLine( - jsonLine({ - jsonrpc: "2.0", - id: 3, - method: "session/prompt", - params: { - sessionId: "session-1", - prompt: [{ type: "text", text: "hello" }], - }, - }), - ); + beginPrompt(projector, "3"); const loadError = projector.ingestLine( jsonLine({ jsonrpc: "2.0", @@ -200,11 +232,25 @@ describe("PromptStreamProjector", () => { }, }), ); + const trailingReplay = projector.ingestLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "should be ignored" }, + }, + }, + }), + ); expect(loadError).toBeNull(); expect(promptDone).toEqual({ type: "done", stopReason: "end_turn", }); + expect(trailingReplay).toBeNull(); }); }); diff --git a/extensions/acpx/src/runtime-internals/events.ts b/extensions/acpx/src/runtime-internals/events.ts index bb13e2f3c78..2e66076f595 100644 --- a/extensions/acpx/src/runtime-internals/events.ts +++ b/extensions/acpx/src/runtime-internals/events.ts @@ -180,11 +180,6 @@ export class PromptStreamProjector { return null; } - const updateEvent = parseSessionUpdateEvent(parsed); - if (updateEvent) { - return updateEvent; - } - if (asTrimmedString(parsed.method) === "session/prompt") { const id = normalizeJsonRpcId(parsed.id); if (id) { @@ -193,8 +188,13 @@ export class PromptStreamProjector { return null; } + const updateEvent = parseSessionUpdateEvent(parsed); + if (updateEvent) { + return this.promptRequestIds.size > 0 ? updateEvent : null; + } + if (Object.hasOwn(parsed, "error")) { - if (!this.shouldHandlePromptResponse(parsed)) { + if (!this.consumePromptResponse(parsed)) { return null; } const error = isRecord(parsed.error) ? parsed.error : null; @@ -211,7 +211,7 @@ export class PromptStreamProjector { } const stopReason = parsePromptStopReason(parsed); - if (!stopReason || !this.shouldHandlePromptResponse(parsed)) { + if (!stopReason || !this.consumePromptResponse(parsed)) { return null; } @@ -221,11 +221,15 @@ export class PromptStreamProjector { }; } - private shouldHandlePromptResponse(message: Record): boolean { + private consumePromptResponse(message: Record): boolean { const id = normalizeJsonRpcId(message.id); if (!id) { return false; } - return this.promptRequestIds.has(id); + if (!this.promptRequestIds.has(id)) { + return false; + } + this.promptRequestIds.delete(id); + return true; } }