diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index ef6a3855b2d..b3f07682f0f 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -1010,6 +1010,22 @@ function createEmbeddedLifecycleTerminalBackstop(params: { runId: string; sessio return { emit, note }; } +function emitModelFallbackStepLifecycle(params: { + runId: string; + sessionKey?: string; + step: Record; +}) { + emitAgentEvent({ + runId: params.runId, + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + stream: "lifecycle", + data: { + phase: "fallback_step", + ...params.step, + }, + }); +} + export async function runAgentTurnWithFallback(params: { commandBody: string; transcriptCommandBody?: string; @@ -1362,6 +1378,13 @@ export async function runAgentTurnWithFallback(params: { runId, sessionId: params.followupRun.run.sessionId, lane: runLane, + onFallbackStep: (step) => { + emitModelFallbackStepLifecycle({ + runId, + sessionKey: params.sessionKey, + step, + }); + }, classifyResult: async ({ result, provider, model }) => { const classification = outcomePlan.classifyRunResult({ result, diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index a9effdbcd73..6e25aaf1fb8 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -764,8 +764,16 @@ describe("runReplyAgent typing (heartbeat)", () => { payloads: [{ text: "final" }], meta: {}, }); - vi.spyOn(modelFallbackModule, "runWithModelFallback").mockImplementationOnce( - async ({ run }: { run: (provider: string, model: string) => Promise }) => ({ + vi.spyOn(modelFallbackModule, "runWithModelFallback").mockImplementationOnce(async (args) => { + const { run, onFallbackStep } = args; + await onFallbackStep?.({ + fallbackStepType: "fallback_step", + fallbackStepFromModel: "fireworks/fireworks/accounts/fireworks/routers/kimi-k2p5-turbo", + fallbackStepToModel: "deepinfra/moonshotai/Kimi-K2.5", + fallbackStepFromFailureReason: "rate_limit", + fallbackStepFinalOutcome: "succeeded", + }); + return { result: await run("deepinfra", "moonshotai/Kimi-K2.5"), provider: "deepinfra", model: "moonshotai/Kimi-K2.5", @@ -777,8 +785,8 @@ describe("runReplyAgent typing (heartbeat)", () => { reason: "rate_limit", }, ], - }), - ); + }; + }); const { run } = createMinimalRun({ resolvedVerboseLevel: testCase.verbose, @@ -809,6 +817,7 @@ describe("runReplyAgent typing (heartbeat)", () => { phases.filter((phase) => phase === "fallback"), testCase.name, ).toHaveLength(1); + expect(phases, testCase.name).toContain("fallback_step"); } }); diff --git a/src/tui/embedded-backend.test.ts b/src/tui/embedded-backend.test.ts index 9a96510adc9..787ed58c729 100644 --- a/src/tui/embedded-backend.test.ts +++ b/src/tui/embedded-backend.test.ts @@ -303,6 +303,77 @@ describe("EmbeddedTuiBackend", () => { }); }); + it("keeps a fallback response deliverable after a retryable lifecycle error", async () => { + const { EmbeddedTuiBackend } = await import("./embedded-backend.js"); + const pending = deferred<{ + payloads: Array<{ text: string }>; + meta: Record; + }>(); + agentCommandFromIngressMock.mockReturnValueOnce(pending.promise); + + const backend = new EmbeddedTuiBackend(); + const events: Array<{ event: string; payload: unknown }> = []; + backend.onEvent = (evt) => { + events.push({ event: evt.event, payload: evt.payload }); + }; + + backend.start(); + await backend.sendChat({ + sessionKey: "agent:main:main", + message: "recover after timeout", + runId: "run-local-fallback", + }); + + registeredListener?.({ + runId: "run-local-fallback", + stream: "lifecycle", + data: { phase: "error", error: "primary model timed out" }, + }); + await flushMicrotasks(); + expect( + events.some( + (entry) => + entry.event === "chat" && (entry.payload as { state?: string }).state === "error", + ), + ).toBe(false); + + registeredListener?.({ + runId: "run-local-fallback", + stream: "lifecycle", + data: { + phase: "fallback_step", + fallbackStepFinalOutcome: "succeeded", + fallbackStepFromModel: "anthropic/claude-sonnet-4-6", + fallbackStepToModel: "anthropic/claude-sonnet-4-5", + }, + }); + registeredListener?.({ + runId: "run-local-fallback", + stream: "assistant", + data: { text: "fallback answer", delta: "fallback answer" }, + }); + registeredListener?.({ + runId: "run-local-fallback", + stream: "lifecycle", + data: { phase: "end", stopReason: "stop" }, + }); + + pending.resolve({ payloads: [{ text: "fallback answer" }], meta: {} }); + await flushMicrotasks(); + vi.advanceTimersByTime(15_001); + + const chatPayloads = events + .filter((entry) => entry.event === "chat") + .map((entry) => entry.payload as { state?: string; message?: { content?: unknown } }); + expect(chatPayloads.some((payload) => payload.state === "error")).toBe(false); + expect(chatPayloads.at(-1)).toMatchObject({ + state: "final", + message: { + content: [{ text: "fallback answer" }], + }, + }); + }); + it("emits side-result events for local /btw runs", async () => { const { EmbeddedTuiBackend } = await import("./embedded-backend.js"); agentCommandFromIngressMock.mockResolvedValueOnce({ diff --git a/src/tui/embedded-backend.ts b/src/tui/embedded-backend.ts index fdcfd5fbfe3..55b6bf86a4a 100644 --- a/src/tui/embedded-backend.ts +++ b/src/tui/embedded-backend.ts @@ -66,6 +66,8 @@ type LocalRunState = { registered: boolean; }; +const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000; + const silentRuntime = { log: (..._args: unknown[]) => undefined, error: (..._args: unknown[]) => undefined, @@ -118,6 +120,7 @@ export class EmbeddedTuiBackend implements TuiBackend { private previousRuntimeLog?: typeof defaultRuntime.log; private previousRuntimeError?: typeof defaultRuntime.error; private seq = 0; + private readonly pendingLifecycleErrors = new Map>(); start() { if (this.unsubscribe) { @@ -144,6 +147,7 @@ export class EmbeddedTuiBackend implements TuiBackend { for (const run of this.runs.values()) { run.controller.abort(); } + this.clearPendingLifecycleErrors(); this.runs.clear(); defaultRuntime.log = this.previousRuntimeLog ?? defaultRuntime.log; defaultRuntime.error = this.previousRuntimeError ?? defaultRuntime.error; @@ -358,6 +362,32 @@ export class EmbeddedTuiBackend implements TuiBackend { }); } + private clearPendingLifecycleError(runId: string) { + const pending = this.pendingLifecycleErrors.get(runId); + if (!pending) { + return; + } + clearTimeout(pending); + this.pendingLifecycleErrors.delete(runId); + } + + private clearPendingLifecycleErrors() { + for (const pending of this.pendingLifecycleErrors.values()) { + clearTimeout(pending); + } + this.pendingLifecycleErrors.clear(); + } + + private scheduleChatError(runId: string, run: LocalRunState, errorMessage?: string) { + this.clearPendingLifecycleError(runId); + const timer = setTimeout(() => { + this.pendingLifecycleErrors.delete(runId); + this.emitChatError(runId, run, errorMessage); + }, LIFECYCLE_ERROR_RETRY_GRACE_MS); + timer.unref?.(); + this.pendingLifecycleErrors.set(runId, timer); + } + private emitChatDelta(runId: string, run: LocalRunState) { const projected = projectLiveAssistantBufferedText(run.buffer.trim(), { suppressLeadFragments: true, @@ -380,6 +410,7 @@ export class EmbeddedTuiBackend implements TuiBackend { } private emitChatFinal(runId: string, run: LocalRunState, stopReason?: string) { + this.clearPendingLifecycleError(runId); if (run.finalSent) { return; } @@ -408,6 +439,7 @@ export class EmbeddedTuiBackend implements TuiBackend { } private emitChatAborted(runId: string, run: LocalRunState) { + this.clearPendingLifecycleError(runId); if (run.finalSent) { return; } @@ -421,6 +453,7 @@ export class EmbeddedTuiBackend implements TuiBackend { } private emitChatError(runId: string, run: LocalRunState, errorMessage?: string) { + this.clearPendingLifecycleError(runId); if (run.finalSent) { return; } @@ -457,6 +490,12 @@ export class EmbeddedTuiBackend implements TuiBackend { return; } + const lifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : ""; + if (evt.stream !== "lifecycle" || lifecyclePhase !== "error") { + this.clearPendingLifecycleError(evt.runId); + } + if (evt.stream !== "assistant") { this.ensureRunRegistered(evt.runId, run); } @@ -490,7 +529,7 @@ export class EmbeddedTuiBackend implements TuiBackend { return; } - const phase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; + const phase = lifecyclePhase; const aborted = evt.data?.aborted === true || run.controller.signal.aborted; if (phase === "end") { if (aborted) { @@ -511,7 +550,8 @@ export class EmbeddedTuiBackend implements TuiBackend { return; } const errorMessage = typeof evt.data?.error === "string" ? evt.data.error : undefined; - this.emitChatError(evt.runId, run, errorMessage); + run.buffer = ""; + this.scheduleChatError(evt.runId, run, errorMessage); } }