fix fallback timeout response delivery

This commit is contained in:
Andy Ye
2026-05-09 16:02:26 -07:00
committed by Peter Steinberger
parent 93e8389148
commit 2e38e92229
4 changed files with 149 additions and 6 deletions

View File

@@ -1010,6 +1010,22 @@ function createEmbeddedLifecycleTerminalBackstop(params: { runId: string; sessio
return { emit, note };
}
function emitModelFallbackStepLifecycle(params: {
runId: string;
sessionKey?: string;
step: Record<string, unknown>;
}) {
emitAgentEvent({
runId: params.runId,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
stream: "lifecycle",
data: {
phase: "fallback_step",
...params.step,
},
});
}
export async function runAgentTurnWithFallback(params: {
commandBody: string;
transcriptCommandBody?: string;
@@ -1362,6 +1378,13 @@ export async function runAgentTurnWithFallback(params: {
runId,
sessionId: params.followupRun.run.sessionId,
lane: runLane,
onFallbackStep: (step) => {
emitModelFallbackStepLifecycle({
runId,
sessionKey: params.sessionKey,
step,
});
},
classifyResult: async ({ result, provider, model }) => {
const classification = outcomePlan.classifyRunResult({
result,

View File

@@ -764,8 +764,16 @@ describe("runReplyAgent typing (heartbeat)", () => {
payloads: [{ text: "final" }],
meta: {},
});
vi.spyOn(modelFallbackModule, "runWithModelFallback").mockImplementationOnce(
async ({ run }: { run: (provider: string, model: string) => Promise<unknown> }) => ({
vi.spyOn(modelFallbackModule, "runWithModelFallback").mockImplementationOnce(async (args) => {
const { run, onFallbackStep } = args;
await onFallbackStep?.({
fallbackStepType: "fallback_step",
fallbackStepFromModel: "fireworks/fireworks/accounts/fireworks/routers/kimi-k2p5-turbo",
fallbackStepToModel: "deepinfra/moonshotai/Kimi-K2.5",
fallbackStepFromFailureReason: "rate_limit",
fallbackStepFinalOutcome: "succeeded",
});
return {
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
provider: "deepinfra",
model: "moonshotai/Kimi-K2.5",
@@ -777,8 +785,8 @@ describe("runReplyAgent typing (heartbeat)", () => {
reason: "rate_limit",
},
],
}),
);
};
});
const { run } = createMinimalRun({
resolvedVerboseLevel: testCase.verbose,
@@ -809,6 +817,7 @@ describe("runReplyAgent typing (heartbeat)", () => {
phases.filter((phase) => phase === "fallback"),
testCase.name,
).toHaveLength(1);
expect(phases, testCase.name).toContain("fallback_step");
}
});

View File

@@ -303,6 +303,77 @@ describe("EmbeddedTuiBackend", () => {
});
});
it("keeps a fallback response deliverable after a retryable lifecycle error", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const pending = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
agentCommandFromIngressMock.mockReturnValueOnce(pending.promise);
const backend = new EmbeddedTuiBackend();
const events: Array<{ event: string; payload: unknown }> = [];
backend.onEvent = (evt) => {
events.push({ event: evt.event, payload: evt.payload });
};
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "recover after timeout",
runId: "run-local-fallback",
});
registeredListener?.({
runId: "run-local-fallback",
stream: "lifecycle",
data: { phase: "error", error: "primary model timed out" },
});
await flushMicrotasks();
expect(
events.some(
(entry) =>
entry.event === "chat" && (entry.payload as { state?: string }).state === "error",
),
).toBe(false);
registeredListener?.({
runId: "run-local-fallback",
stream: "lifecycle",
data: {
phase: "fallback_step",
fallbackStepFinalOutcome: "succeeded",
fallbackStepFromModel: "anthropic/claude-sonnet-4-6",
fallbackStepToModel: "anthropic/claude-sonnet-4-5",
},
});
registeredListener?.({
runId: "run-local-fallback",
stream: "assistant",
data: { text: "fallback answer", delta: "fallback answer" },
});
registeredListener?.({
runId: "run-local-fallback",
stream: "lifecycle",
data: { phase: "end", stopReason: "stop" },
});
pending.resolve({ payloads: [{ text: "fallback answer" }], meta: {} });
await flushMicrotasks();
vi.advanceTimersByTime(15_001);
const chatPayloads = events
.filter((entry) => entry.event === "chat")
.map((entry) => entry.payload as { state?: string; message?: { content?: unknown } });
expect(chatPayloads.some((payload) => payload.state === "error")).toBe(false);
expect(chatPayloads.at(-1)).toMatchObject({
state: "final",
message: {
content: [{ text: "fallback answer" }],
},
});
});
it("emits side-result events for local /btw runs", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
agentCommandFromIngressMock.mockResolvedValueOnce({

View File

@@ -66,6 +66,8 @@ type LocalRunState = {
registered: boolean;
};
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
const silentRuntime = {
log: (..._args: unknown[]) => undefined,
error: (..._args: unknown[]) => undefined,
@@ -118,6 +120,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
private previousRuntimeLog?: typeof defaultRuntime.log;
private previousRuntimeError?: typeof defaultRuntime.error;
private seq = 0;
private readonly pendingLifecycleErrors = new Map<string, ReturnType<typeof setTimeout>>();
start() {
if (this.unsubscribe) {
@@ -144,6 +147,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
for (const run of this.runs.values()) {
run.controller.abort();
}
this.clearPendingLifecycleErrors();
this.runs.clear();
defaultRuntime.log = this.previousRuntimeLog ?? defaultRuntime.log;
defaultRuntime.error = this.previousRuntimeError ?? defaultRuntime.error;
@@ -358,6 +362,32 @@ export class EmbeddedTuiBackend implements TuiBackend {
});
}
private clearPendingLifecycleError(runId: string) {
const pending = this.pendingLifecycleErrors.get(runId);
if (!pending) {
return;
}
clearTimeout(pending);
this.pendingLifecycleErrors.delete(runId);
}
private clearPendingLifecycleErrors() {
for (const pending of this.pendingLifecycleErrors.values()) {
clearTimeout(pending);
}
this.pendingLifecycleErrors.clear();
}
private scheduleChatError(runId: string, run: LocalRunState, errorMessage?: string) {
this.clearPendingLifecycleError(runId);
const timer = setTimeout(() => {
this.pendingLifecycleErrors.delete(runId);
this.emitChatError(runId, run, errorMessage);
}, LIFECYCLE_ERROR_RETRY_GRACE_MS);
timer.unref?.();
this.pendingLifecycleErrors.set(runId, timer);
}
private emitChatDelta(runId: string, run: LocalRunState) {
const projected = projectLiveAssistantBufferedText(run.buffer.trim(), {
suppressLeadFragments: true,
@@ -380,6 +410,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
}
private emitChatFinal(runId: string, run: LocalRunState, stopReason?: string) {
this.clearPendingLifecycleError(runId);
if (run.finalSent) {
return;
}
@@ -408,6 +439,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
}
private emitChatAborted(runId: string, run: LocalRunState) {
this.clearPendingLifecycleError(runId);
if (run.finalSent) {
return;
}
@@ -421,6 +453,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
}
private emitChatError(runId: string, run: LocalRunState, errorMessage?: string) {
this.clearPendingLifecycleError(runId);
if (run.finalSent) {
return;
}
@@ -457,6 +490,12 @@ export class EmbeddedTuiBackend implements TuiBackend {
return;
}
const lifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : "";
if (evt.stream !== "lifecycle" || lifecyclePhase !== "error") {
this.clearPendingLifecycleError(evt.runId);
}
if (evt.stream !== "assistant") {
this.ensureRunRegistered(evt.runId, run);
}
@@ -490,7 +529,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
return;
}
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
const phase = lifecyclePhase;
const aborted = evt.data?.aborted === true || run.controller.signal.aborted;
if (phase === "end") {
if (aborted) {
@@ -511,7 +550,8 @@ export class EmbeddedTuiBackend implements TuiBackend {
return;
}
const errorMessage = typeof evt.data?.error === "string" ? evt.data.error : undefined;
this.emitChatError(evt.runId, run, errorMessage);
run.buffer = "";
this.scheduleChatError(evt.runId, run, errorMessage);
}
}