mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-05 07:10:23 +00:00
feat(auto-reply): add model fallback lifecycle visibility in status, verbose logs, and WebUI (#20704)
This commit is contained in:
123
src/auto-reply/fallback-state.test.ts
Normal file
123
src/auto-reply/fallback-state.test.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
resolveActiveFallbackState,
|
||||
resolveFallbackTransition,
|
||||
type FallbackNoticeState,
|
||||
} from "./fallback-state.js";
|
||||
|
||||
const baseAttempt = {
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit" as const,
|
||||
};
|
||||
|
||||
describe("fallback-state", () => {
|
||||
it("treats fallback as active only when state matches selected and active refs", () => {
|
||||
const state: FallbackNoticeState = {
|
||||
fallbackNoticeSelectedModel: "fireworks/minimax-m2p5",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
};
|
||||
|
||||
const resolved = resolveActiveFallbackState({
|
||||
selectedModelRef: "fireworks/minimax-m2p5",
|
||||
activeModelRef: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
state,
|
||||
});
|
||||
|
||||
expect(resolved.active).toBe(true);
|
||||
expect(resolved.reason).toBe("rate limit");
|
||||
});
|
||||
|
||||
it("does not treat runtime drift as fallback when persisted state does not match", () => {
|
||||
const state: FallbackNoticeState = {
|
||||
fallbackNoticeSelectedModel: "anthropic/claude",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
};
|
||||
|
||||
const resolved = resolveActiveFallbackState({
|
||||
selectedModelRef: "fireworks/minimax-m2p5",
|
||||
activeModelRef: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
state,
|
||||
});
|
||||
|
||||
expect(resolved.active).toBe(false);
|
||||
expect(resolved.reason).toBeUndefined();
|
||||
});
|
||||
|
||||
it("marks fallback transition when selected->active pair changes", () => {
|
||||
const resolved = resolveFallbackTransition({
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "deepinfra",
|
||||
activeModel: "moonshotai/Kimi-K2.5",
|
||||
attempts: [baseAttempt],
|
||||
state: {},
|
||||
});
|
||||
|
||||
expect(resolved.fallbackActive).toBe(true);
|
||||
expect(resolved.fallbackTransitioned).toBe(true);
|
||||
expect(resolved.fallbackCleared).toBe(false);
|
||||
expect(resolved.stateChanged).toBe(true);
|
||||
expect(resolved.reasonSummary).toBe("rate limit");
|
||||
expect(resolved.nextState.selectedModel).toBe("fireworks/minimax-m2p5");
|
||||
expect(resolved.nextState.activeModel).toBe("deepinfra/moonshotai/Kimi-K2.5");
|
||||
});
|
||||
|
||||
it("normalizes fallback reason whitespace for summaries", () => {
|
||||
const resolved = resolveFallbackTransition({
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "deepinfra",
|
||||
activeModel: "moonshotai/Kimi-K2.5",
|
||||
attempts: [{ ...baseAttempt, reason: "rate_limit\n\tburst" }],
|
||||
state: {},
|
||||
});
|
||||
|
||||
expect(resolved.reasonSummary).toBe("rate limit burst");
|
||||
});
|
||||
|
||||
it("refreshes reason when fallback remains active with same model pair", () => {
|
||||
const resolved = resolveFallbackTransition({
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "deepinfra",
|
||||
activeModel: "moonshotai/Kimi-K2.5",
|
||||
attempts: [{ ...baseAttempt, reason: "timeout" }],
|
||||
state: {
|
||||
fallbackNoticeSelectedModel: "fireworks/minimax-m2p5",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolved.fallbackTransitioned).toBe(false);
|
||||
expect(resolved.stateChanged).toBe(true);
|
||||
expect(resolved.nextState.reason).toBe("timeout");
|
||||
});
|
||||
|
||||
it("marks fallback as cleared when runtime returns to selected model", () => {
|
||||
const resolved = resolveFallbackTransition({
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "fireworks",
|
||||
activeModel: "fireworks/minimax-m2p5",
|
||||
attempts: [],
|
||||
state: {
|
||||
fallbackNoticeSelectedModel: "fireworks/minimax-m2p5",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolved.fallbackActive).toBe(false);
|
||||
expect(resolved.fallbackCleared).toBe(true);
|
||||
expect(resolved.fallbackTransitioned).toBe(false);
|
||||
expect(resolved.stateChanged).toBe(true);
|
||||
expect(resolved.nextState.selectedModel).toBeUndefined();
|
||||
expect(resolved.nextState.activeModel).toBeUndefined();
|
||||
expect(resolved.nextState.reason).toBeUndefined();
|
||||
});
|
||||
});
|
||||
180
src/auto-reply/fallback-state.ts
Normal file
180
src/auto-reply/fallback-state.ts
Normal file
@@ -0,0 +1,180 @@
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
import { formatProviderModelRef } from "./model-runtime.js";
|
||||
import type { RuntimeFallbackAttempt } from "./reply/agent-runner-execution.js";
|
||||
|
||||
const FALLBACK_REASON_PART_MAX = 80;
|
||||
|
||||
export type FallbackNoticeState = Pick<
|
||||
SessionEntry,
|
||||
"fallbackNoticeSelectedModel" | "fallbackNoticeActiveModel" | "fallbackNoticeReason"
|
||||
>;
|
||||
|
||||
export function normalizeFallbackModelRef(value?: string): string | undefined {
|
||||
const trimmed = String(value ?? "").trim();
|
||||
return trimmed || undefined;
|
||||
}
|
||||
|
||||
function truncateFallbackReasonPart(value: string, max = FALLBACK_REASON_PART_MAX): string {
|
||||
const text = String(value ?? "")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
if (text.length <= max) {
|
||||
return text;
|
||||
}
|
||||
return `${text.slice(0, Math.max(0, max - 1)).trimEnd()}…`;
|
||||
}
|
||||
|
||||
export function formatFallbackAttemptReason(attempt: RuntimeFallbackAttempt): string {
|
||||
const reason = attempt.reason?.trim();
|
||||
if (reason) {
|
||||
return reason.replace(/_/g, " ");
|
||||
}
|
||||
const code = attempt.code?.trim();
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
if (typeof attempt.status === "number") {
|
||||
return `HTTP ${attempt.status}`;
|
||||
}
|
||||
return truncateFallbackReasonPart(attempt.error || "error");
|
||||
}
|
||||
|
||||
function formatFallbackAttemptSummary(attempt: RuntimeFallbackAttempt): string {
|
||||
return `${formatProviderModelRef(attempt.provider, attempt.model)} ${formatFallbackAttemptReason(attempt)}`;
|
||||
}
|
||||
|
||||
export function buildFallbackReasonSummary(attempts: RuntimeFallbackAttempt[]): string {
|
||||
const firstAttempt = attempts[0];
|
||||
const firstReason = firstAttempt
|
||||
? formatFallbackAttemptReason(firstAttempt)
|
||||
: "selected model unavailable";
|
||||
const moreAttempts = attempts.length > 1 ? ` (+${attempts.length - 1} more attempts)` : "";
|
||||
return `${truncateFallbackReasonPart(firstReason)}${moreAttempts}`;
|
||||
}
|
||||
|
||||
export function buildFallbackAttemptSummaries(attempts: RuntimeFallbackAttempt[]): string[] {
|
||||
return attempts.map((attempt) =>
|
||||
truncateFallbackReasonPart(formatFallbackAttemptSummary(attempt)),
|
||||
);
|
||||
}
|
||||
|
||||
export function buildFallbackNotice(params: {
|
||||
selectedProvider: string;
|
||||
selectedModel: string;
|
||||
activeProvider: string;
|
||||
activeModel: string;
|
||||
attempts: RuntimeFallbackAttempt[];
|
||||
}): string | null {
|
||||
const selected = formatProviderModelRef(params.selectedProvider, params.selectedModel);
|
||||
const active = formatProviderModelRef(params.activeProvider, params.activeModel);
|
||||
if (selected === active) {
|
||||
return null;
|
||||
}
|
||||
const reasonSummary = buildFallbackReasonSummary(params.attempts);
|
||||
return `↪️ Model Fallback: ${active} (selected ${selected}; ${reasonSummary})`;
|
||||
}
|
||||
|
||||
export function buildFallbackClearedNotice(params: {
|
||||
selectedProvider: string;
|
||||
selectedModel: string;
|
||||
previousActiveModel?: string;
|
||||
}): string {
|
||||
const selected = formatProviderModelRef(params.selectedProvider, params.selectedModel);
|
||||
const previous = normalizeFallbackModelRef(params.previousActiveModel);
|
||||
if (previous && previous !== selected) {
|
||||
return `↪️ Model Fallback cleared: ${selected} (was ${previous})`;
|
||||
}
|
||||
return `↪️ Model Fallback cleared: ${selected}`;
|
||||
}
|
||||
|
||||
export function resolveActiveFallbackState(params: {
|
||||
selectedModelRef: string;
|
||||
activeModelRef: string;
|
||||
state?: FallbackNoticeState;
|
||||
}): { active: boolean; reason?: string } {
|
||||
const selected = normalizeFallbackModelRef(params.state?.fallbackNoticeSelectedModel);
|
||||
const active = normalizeFallbackModelRef(params.state?.fallbackNoticeActiveModel);
|
||||
const reason = normalizeFallbackModelRef(params.state?.fallbackNoticeReason);
|
||||
const fallbackActive =
|
||||
params.selectedModelRef !== params.activeModelRef &&
|
||||
selected === params.selectedModelRef &&
|
||||
active === params.activeModelRef;
|
||||
return {
|
||||
active: fallbackActive,
|
||||
reason: fallbackActive ? reason : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
export type ResolvedFallbackTransition = {
|
||||
selectedModelRef: string;
|
||||
activeModelRef: string;
|
||||
fallbackActive: boolean;
|
||||
fallbackTransitioned: boolean;
|
||||
fallbackCleared: boolean;
|
||||
reasonSummary: string;
|
||||
attemptSummaries: string[];
|
||||
previousState: {
|
||||
selectedModel?: string;
|
||||
activeModel?: string;
|
||||
reason?: string;
|
||||
};
|
||||
nextState: {
|
||||
selectedModel?: string;
|
||||
activeModel?: string;
|
||||
reason?: string;
|
||||
};
|
||||
stateChanged: boolean;
|
||||
};
|
||||
|
||||
export function resolveFallbackTransition(params: {
|
||||
selectedProvider: string;
|
||||
selectedModel: string;
|
||||
activeProvider: string;
|
||||
activeModel: string;
|
||||
attempts: RuntimeFallbackAttempt[];
|
||||
state?: FallbackNoticeState;
|
||||
}): ResolvedFallbackTransition {
|
||||
const selectedModelRef = formatProviderModelRef(params.selectedProvider, params.selectedModel);
|
||||
const activeModelRef = formatProviderModelRef(params.activeProvider, params.activeModel);
|
||||
const previousState = {
|
||||
selectedModel: normalizeFallbackModelRef(params.state?.fallbackNoticeSelectedModel),
|
||||
activeModel: normalizeFallbackModelRef(params.state?.fallbackNoticeActiveModel),
|
||||
reason: normalizeFallbackModelRef(params.state?.fallbackNoticeReason),
|
||||
};
|
||||
const fallbackActive = selectedModelRef !== activeModelRef;
|
||||
const fallbackTransitioned =
|
||||
fallbackActive &&
|
||||
(previousState.selectedModel !== selectedModelRef ||
|
||||
previousState.activeModel !== activeModelRef);
|
||||
const fallbackCleared =
|
||||
!fallbackActive && Boolean(previousState.selectedModel || previousState.activeModel);
|
||||
const reasonSummary = buildFallbackReasonSummary(params.attempts);
|
||||
const attemptSummaries = buildFallbackAttemptSummaries(params.attempts);
|
||||
const nextState = fallbackActive
|
||||
? {
|
||||
selectedModel: selectedModelRef,
|
||||
activeModel: activeModelRef,
|
||||
reason: reasonSummary,
|
||||
}
|
||||
: {
|
||||
selectedModel: undefined,
|
||||
activeModel: undefined,
|
||||
reason: undefined,
|
||||
};
|
||||
const stateChanged =
|
||||
previousState.selectedModel !== nextState.selectedModel ||
|
||||
previousState.activeModel !== nextState.activeModel ||
|
||||
previousState.reason !== nextState.reason;
|
||||
return {
|
||||
selectedModelRef,
|
||||
activeModelRef,
|
||||
fallbackActive,
|
||||
fallbackTransitioned,
|
||||
fallbackCleared,
|
||||
reasonSummary,
|
||||
attemptSummaries,
|
||||
previousState,
|
||||
nextState,
|
||||
stateChanged,
|
||||
};
|
||||
}
|
||||
93
src/auto-reply/model-runtime.ts
Normal file
93
src/auto-reply/model-runtime.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
|
||||
export function formatProviderModelRef(providerRaw: string, modelRaw: string): string {
|
||||
const provider = String(providerRaw ?? "").trim();
|
||||
const model = String(modelRaw ?? "").trim();
|
||||
if (!provider) {
|
||||
return model;
|
||||
}
|
||||
if (!model) {
|
||||
return provider;
|
||||
}
|
||||
const prefix = `${provider}/`;
|
||||
if (model.toLowerCase().startsWith(prefix.toLowerCase())) {
|
||||
const normalizedModel = model.slice(prefix.length).trim();
|
||||
if (normalizedModel) {
|
||||
return `${provider}/${normalizedModel}`;
|
||||
}
|
||||
}
|
||||
return `${provider}/${model}`;
|
||||
}
|
||||
|
||||
type ModelRef = {
|
||||
provider: string;
|
||||
model: string;
|
||||
label: string;
|
||||
};
|
||||
|
||||
function normalizeModelWithinProvider(provider: string, modelRaw: string): string {
|
||||
const model = String(modelRaw ?? "").trim();
|
||||
if (!provider || !model) {
|
||||
return model;
|
||||
}
|
||||
const prefix = `${provider}/`;
|
||||
if (model.toLowerCase().startsWith(prefix.toLowerCase())) {
|
||||
const withoutPrefix = model.slice(prefix.length).trim();
|
||||
if (withoutPrefix) {
|
||||
return withoutPrefix;
|
||||
}
|
||||
}
|
||||
return model;
|
||||
}
|
||||
|
||||
function normalizeModelRef(
|
||||
rawModel: string,
|
||||
fallbackProvider: string,
|
||||
parseEmbeddedProvider = false,
|
||||
): ModelRef {
|
||||
const trimmed = String(rawModel ?? "").trim();
|
||||
const slashIndex = parseEmbeddedProvider ? trimmed.indexOf("/") : -1;
|
||||
if (slashIndex > 0) {
|
||||
const provider = trimmed.slice(0, slashIndex).trim();
|
||||
const model = trimmed.slice(slashIndex + 1).trim();
|
||||
if (provider && model) {
|
||||
return {
|
||||
provider,
|
||||
model,
|
||||
label: `${provider}/${model}`,
|
||||
};
|
||||
}
|
||||
}
|
||||
const provider = String(fallbackProvider ?? "").trim();
|
||||
const dedupedModel = normalizeModelWithinProvider(provider, trimmed);
|
||||
return {
|
||||
provider,
|
||||
model: dedupedModel || trimmed,
|
||||
label: provider ? formatProviderModelRef(provider, dedupedModel || trimmed) : trimmed,
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveSelectedAndActiveModel(params: {
|
||||
selectedProvider: string;
|
||||
selectedModel: string;
|
||||
sessionEntry?: Pick<SessionEntry, "modelProvider" | "model">;
|
||||
}): {
|
||||
selected: ModelRef;
|
||||
active: ModelRef;
|
||||
activeDiffers: boolean;
|
||||
} {
|
||||
const selected = normalizeModelRef(params.selectedModel, params.selectedProvider);
|
||||
const runtimeModel = params.sessionEntry?.model?.trim();
|
||||
const runtimeProvider = params.sessionEntry?.modelProvider?.trim();
|
||||
|
||||
const active = runtimeModel
|
||||
? normalizeModelRef(runtimeModel, runtimeProvider || selected.provider, !runtimeProvider)
|
||||
: selected;
|
||||
const activeDiffers = active.provider !== selected.provider || active.model !== selected.model;
|
||||
|
||||
return {
|
||||
selected,
|
||||
active,
|
||||
activeDiffers,
|
||||
};
|
||||
}
|
||||
@@ -40,12 +40,23 @@ import type { FollowupRun } from "./queue.js";
|
||||
import { createBlockReplyDeliveryHandler } from "./reply-delivery.js";
|
||||
import type { TypingSignaler } from "./typing-mode.js";
|
||||
|
||||
export type RuntimeFallbackAttempt = {
|
||||
provider: string;
|
||||
model: string;
|
||||
error: string;
|
||||
reason?: string;
|
||||
status?: number;
|
||||
code?: string;
|
||||
};
|
||||
|
||||
export type AgentRunLoopResult =
|
||||
| {
|
||||
kind: "success";
|
||||
runId: string;
|
||||
runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
||||
fallbackProvider?: string;
|
||||
fallbackModel?: string;
|
||||
fallbackAttempts: RuntimeFallbackAttempt[];
|
||||
didLogHeartbeatStrip: boolean;
|
||||
autoCompactionCompleted: boolean;
|
||||
/** Payload keys sent directly (not via pipeline) during tool flush. */
|
||||
@@ -106,6 +117,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
||||
let fallbackProvider = params.followupRun.run.provider;
|
||||
let fallbackModel = params.followupRun.run.model;
|
||||
let fallbackAttempts: RuntimeFallbackAttempt[] = [];
|
||||
let didResetAfterCompactionFailure = false;
|
||||
let didRetryTransientHttpError = false;
|
||||
|
||||
@@ -397,6 +409,16 @@ export async function runAgentTurnWithFallback(params: {
|
||||
runResult = fallbackResult.result;
|
||||
fallbackProvider = fallbackResult.provider;
|
||||
fallbackModel = fallbackResult.model;
|
||||
fallbackAttempts = Array.isArray(fallbackResult.attempts)
|
||||
? fallbackResult.attempts.map((attempt) => ({
|
||||
provider: String(attempt.provider ?? ""),
|
||||
model: String(attempt.model ?? ""),
|
||||
error: String(attempt.error ?? ""),
|
||||
reason: attempt.reason ? String(attempt.reason) : undefined,
|
||||
status: typeof attempt.status === "number" ? attempt.status : undefined,
|
||||
code: attempt.code ? String(attempt.code) : undefined,
|
||||
}))
|
||||
: [];
|
||||
|
||||
// Some embedded runs surface context overflow as an error payload instead of throwing.
|
||||
// Treat those as a session-level failure and auto-recover by starting a fresh session.
|
||||
@@ -543,9 +565,11 @@ export async function runAgentTurnWithFallback(params: {
|
||||
|
||||
return {
|
||||
kind: "success",
|
||||
runId,
|
||||
runResult,
|
||||
fallbackProvider,
|
||||
fallbackModel,
|
||||
fallbackAttempts,
|
||||
didLogHeartbeatStrip,
|
||||
autoCompactionCompleted,
|
||||
directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined,
|
||||
|
||||
@@ -54,6 +54,7 @@ vi.mock("../../agents/model-fallback.js", () => ({
|
||||
result: await run(provider, model),
|
||||
provider,
|
||||
model,
|
||||
attempts: [],
|
||||
}),
|
||||
}));
|
||||
|
||||
@@ -508,6 +509,30 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
expect(onToolResult).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("retries transient HTTP failures once with timer-driven backoff", async () => {
|
||||
vi.useFakeTimers();
|
||||
let calls = 0;
|
||||
state.runEmbeddedPiAgentMock.mockImplementation(async () => {
|
||||
calls += 1;
|
||||
if (calls === 1) {
|
||||
throw new Error("502 Bad Gateway");
|
||||
}
|
||||
return { payloads: [{ text: "final" }], meta: {} };
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
typingMode: "message",
|
||||
});
|
||||
const runPromise = run();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2_499);
|
||||
expect(calls).toBe(1);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
await runPromise;
|
||||
expect(calls).toBe(2);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("announces auto-compaction in verbose mode and tracks count", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
@@ -538,12 +563,482 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("announces model fallback in verbose mode", async () => {
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ payloads: [{ text: "final" }], meta: {} });
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
vi.spyOn(modelFallback, "runWithModelFallback").mockImplementationOnce(
|
||||
async ({ run }: { run: (provider: string, model: string) => Promise<unknown> }) => ({
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "on",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const res = await run();
|
||||
expect(Array.isArray(res)).toBe(true);
|
||||
const payloads = res as { text?: string }[];
|
||||
expect(payloads[0]?.text).toContain("Model Fallback:");
|
||||
expect(payloads[0]?.text).toContain("deepinfra/moonshotai/Kimi-K2.5");
|
||||
expect(sessionEntry.fallbackNoticeReason).toBe("rate limit");
|
||||
});
|
||||
|
||||
it("does not announce model fallback when verbose is off", async () => {
|
||||
const { onAgentEvent } = await import("../../infra/agent-events.js");
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ payloads: [{ text: "final" }], meta: {} });
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
vi.spyOn(modelFallback, "runWithModelFallback").mockImplementationOnce(
|
||||
async ({ run }: { run: (provider: string, model: string) => Promise<unknown> }) => ({
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "off",
|
||||
});
|
||||
const phases: string[] = [];
|
||||
const off = onAgentEvent((evt) => {
|
||||
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : null;
|
||||
if (evt.stream === "lifecycle" && phase) {
|
||||
phases.push(phase);
|
||||
}
|
||||
});
|
||||
const res = await run();
|
||||
off();
|
||||
const payload = Array.isArray(res) ? (res[0] as { text?: string }) : (res as { text?: string });
|
||||
expect(payload.text).not.toContain("Model Fallback:");
|
||||
expect(phases.filter((phase) => phase === "fallback")).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("announces model fallback only once per active fallback state", async () => {
|
||||
const { onAgentEvent } = await import("../../infra/agent-events.js");
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "final" }],
|
||||
meta: {},
|
||||
});
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
const fallbackSpy = vi
|
||||
.spyOn(modelFallback, "runWithModelFallback")
|
||||
.mockImplementation(
|
||||
async ({ run }: { run: (provider: string, model: string) => Promise<unknown> }) => ({
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
}),
|
||||
);
|
||||
try {
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "on",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const fallbackEvents: Array<Record<string, unknown>> = [];
|
||||
const off = onAgentEvent((evt) => {
|
||||
if (evt.stream === "lifecycle" && evt.data?.phase === "fallback") {
|
||||
fallbackEvents.push(evt.data);
|
||||
}
|
||||
});
|
||||
const first = await run();
|
||||
const second = await run();
|
||||
off();
|
||||
|
||||
const firstText = Array.isArray(first) ? first[0]?.text : first?.text;
|
||||
const secondText = Array.isArray(second) ? second[0]?.text : second?.text;
|
||||
expect(firstText).toContain("Model Fallback:");
|
||||
expect(secondText).not.toContain("Model Fallback:");
|
||||
expect(fallbackEvents).toHaveLength(1);
|
||||
} finally {
|
||||
fallbackSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("re-announces model fallback after returning to selected model", async () => {
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
let callCount = 0;
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "final" }],
|
||||
meta: {},
|
||||
});
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
const fallbackSpy = vi
|
||||
.spyOn(modelFallback, "runWithModelFallback")
|
||||
.mockImplementation(
|
||||
async ({
|
||||
provider,
|
||||
model,
|
||||
run,
|
||||
}: {
|
||||
provider: string;
|
||||
model: string;
|
||||
run: (provider: string, model: string) => Promise<unknown>;
|
||||
}) => {
|
||||
callCount += 1;
|
||||
if (callCount === 2) {
|
||||
return {
|
||||
result: await run(provider, model),
|
||||
provider,
|
||||
model,
|
||||
attempts: [],
|
||||
};
|
||||
}
|
||||
return {
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
};
|
||||
},
|
||||
);
|
||||
try {
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "on",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const first = await run();
|
||||
const second = await run();
|
||||
const third = await run();
|
||||
|
||||
const firstText = Array.isArray(first) ? first[0]?.text : first?.text;
|
||||
const secondText = Array.isArray(second) ? second[0]?.text : second?.text;
|
||||
const thirdText = Array.isArray(third) ? third[0]?.text : third?.text;
|
||||
expect(firstText).toContain("Model Fallback:");
|
||||
expect(secondText).not.toContain("Model Fallback:");
|
||||
expect(thirdText).toContain("Model Fallback:");
|
||||
} finally {
|
||||
fallbackSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("announces fallback-cleared once when runtime returns to selected model", async () => {
|
||||
const { onAgentEvent } = await import("../../infra/agent-events.js");
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
let callCount = 0;
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "final" }],
|
||||
meta: {},
|
||||
});
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
const fallbackSpy = vi
|
||||
.spyOn(modelFallback, "runWithModelFallback")
|
||||
.mockImplementation(
|
||||
async ({
|
||||
provider,
|
||||
model,
|
||||
run,
|
||||
}: {
|
||||
provider: string;
|
||||
model: string;
|
||||
run: (provider: string, model: string) => Promise<unknown>;
|
||||
}) => {
|
||||
callCount += 1;
|
||||
if (callCount === 1) {
|
||||
return {
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
return {
|
||||
result: await run(provider, model),
|
||||
provider,
|
||||
model,
|
||||
attempts: [],
|
||||
};
|
||||
},
|
||||
);
|
||||
try {
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "on",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const phases: string[] = [];
|
||||
const off = onAgentEvent((evt) => {
|
||||
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : null;
|
||||
if (evt.stream === "lifecycle" && phase) {
|
||||
phases.push(phase);
|
||||
}
|
||||
});
|
||||
const first = await run();
|
||||
const second = await run();
|
||||
const third = await run();
|
||||
off();
|
||||
|
||||
const firstText = Array.isArray(first) ? first[0]?.text : first?.text;
|
||||
const secondText = Array.isArray(second) ? second[0]?.text : second?.text;
|
||||
const thirdText = Array.isArray(third) ? third[0]?.text : third?.text;
|
||||
expect(firstText).toContain("Model Fallback:");
|
||||
expect(secondText).toContain("Model Fallback cleared:");
|
||||
expect(thirdText).not.toContain("Model Fallback cleared:");
|
||||
expect(phases.filter((phase) => phase === "fallback")).toHaveLength(1);
|
||||
expect(phases.filter((phase) => phase === "fallback_cleared")).toHaveLength(1);
|
||||
} finally {
|
||||
fallbackSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("emits fallback lifecycle events while verbose is off", async () => {
|
||||
const { onAgentEvent } = await import("../../infra/agent-events.js");
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
let callCount = 0;
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "final" }],
|
||||
meta: {},
|
||||
});
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
const fallbackSpy = vi
|
||||
.spyOn(modelFallback, "runWithModelFallback")
|
||||
.mockImplementation(
|
||||
async ({
|
||||
provider,
|
||||
model,
|
||||
run,
|
||||
}: {
|
||||
provider: string;
|
||||
model: string;
|
||||
run: (provider: string, model: string) => Promise<unknown>;
|
||||
}) => {
|
||||
callCount += 1;
|
||||
if (callCount === 1) {
|
||||
return {
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
error: "Provider fireworks is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
return {
|
||||
result: await run(provider, model),
|
||||
provider,
|
||||
model,
|
||||
attempts: [],
|
||||
};
|
||||
},
|
||||
);
|
||||
try {
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "off",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const phases: string[] = [];
|
||||
const off = onAgentEvent((evt) => {
|
||||
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : null;
|
||||
if (evt.stream === "lifecycle" && phase) {
|
||||
phases.push(phase);
|
||||
}
|
||||
});
|
||||
const first = await run();
|
||||
const second = await run();
|
||||
off();
|
||||
|
||||
const firstText = Array.isArray(first) ? first[0]?.text : first?.text;
|
||||
const secondText = Array.isArray(second) ? second[0]?.text : second?.text;
|
||||
expect(firstText).not.toContain("Model Fallback:");
|
||||
expect(secondText).not.toContain("Model Fallback cleared:");
|
||||
expect(phases.filter((phase) => phase === "fallback")).toHaveLength(1);
|
||||
expect(phases.filter((phase) => phase === "fallback_cleared")).toHaveLength(1);
|
||||
} finally {
|
||||
fallbackSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("backfills fallback reason when fallback is already active", async () => {
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
fallbackNoticeSelectedModel: "anthropic/claude",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
modelProvider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "final" }],
|
||||
meta: {},
|
||||
});
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
const fallbackSpy = vi
|
||||
.spyOn(modelFallback, "runWithModelFallback")
|
||||
.mockImplementation(
|
||||
async ({ run }: { run: (provider: string, model: string) => Promise<unknown> }) => ({
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "anthropic",
|
||||
model: "claude",
|
||||
error: "Provider anthropic is in cooldown (all profiles unavailable)",
|
||||
reason: "rate_limit",
|
||||
},
|
||||
],
|
||||
}),
|
||||
);
|
||||
try {
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "on",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const res = await run();
|
||||
const firstText = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(firstText).not.toContain("Model Fallback:");
|
||||
expect(sessionEntry.fallbackNoticeReason).toBe("rate limit");
|
||||
} finally {
|
||||
fallbackSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("refreshes fallback reason summary while fallback stays active", async () => {
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
fallbackNoticeSelectedModel: "anthropic/claude",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
modelProvider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "final" }],
|
||||
meta: {},
|
||||
});
|
||||
const modelFallback = await import("../../agents/model-fallback.js");
|
||||
const fallbackSpy = vi
|
||||
.spyOn(modelFallback, "runWithModelFallback")
|
||||
.mockImplementation(
|
||||
async ({ run }: { run: (provider: string, model: string) => Promise<unknown> }) => ({
|
||||
result: await run("deepinfra", "moonshotai/Kimi-K2.5"),
|
||||
provider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
attempts: [
|
||||
{
|
||||
provider: "anthropic",
|
||||
model: "claude",
|
||||
error: "Provider anthropic is in cooldown (all profiles unavailable)",
|
||||
reason: "timeout",
|
||||
},
|
||||
],
|
||||
}),
|
||||
);
|
||||
try {
|
||||
const { run } = createMinimalRun({
|
||||
resolvedVerboseLevel: "on",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
});
|
||||
const res = await run();
|
||||
const firstText = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(firstText).not.toContain("Model Fallback:");
|
||||
expect(sessionEntry.fallbackNoticeReason).toBe("timeout");
|
||||
} finally {
|
||||
fallbackSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("retries after compaction failure by resetting the session", async () => {
|
||||
await withTempStateDir(async (stateDir) => {
|
||||
const sessionId = "session";
|
||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||
const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId);
|
||||
const sessionEntry = { sessionId, updatedAt: Date.now(), sessionFile: transcriptPath };
|
||||
const sessionEntry = {
|
||||
sessionId,
|
||||
updatedAt: Date.now(),
|
||||
sessionFile: transcriptPath,
|
||||
fallbackNoticeSelectedModel: "fireworks/minimax-m2p5",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
@@ -575,9 +1070,15 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
}
|
||||
expect(payload.text?.toLowerCase()).toContain("reset");
|
||||
expect(sessionStore.main.sessionId).not.toBe(sessionId);
|
||||
expect(sessionStore.main.fallbackNoticeSelectedModel).toBeUndefined();
|
||||
expect(sessionStore.main.fallbackNoticeActiveModel).toBeUndefined();
|
||||
expect(sessionStore.main.fallbackNoticeReason).toBeUndefined();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
|
||||
expect(persisted.main.fallbackNoticeSelectedModel).toBeUndefined();
|
||||
expect(persisted.main.fallbackNoticeActiveModel).toBeUndefined();
|
||||
expect(persisted.main.fallbackNoticeReason).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -15,10 +15,16 @@ import {
|
||||
updateSessionStoreEntry,
|
||||
} from "../../config/sessions.js";
|
||||
import type { TypingMode } from "../../config/types.js";
|
||||
import { emitAgentEvent } from "../../infra/agent-events.js";
|
||||
import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
|
||||
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js";
|
||||
import {
|
||||
buildFallbackClearedNotice,
|
||||
buildFallbackNotice,
|
||||
resolveFallbackTransition,
|
||||
} from "../fallback-state.js";
|
||||
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
|
||||
import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
@@ -290,6 +296,9 @@ export async function runReplyAgent(params: {
|
||||
updatedAt: Date.now(),
|
||||
systemSent: false,
|
||||
abortedLastRun: false,
|
||||
fallbackNoticeSelectedModel: undefined,
|
||||
fallbackNoticeActiveModel: undefined,
|
||||
fallbackNoticeReason: undefined,
|
||||
};
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const nextSessionFile = resolveSessionTranscriptPath(
|
||||
@@ -373,7 +382,14 @@ export async function runReplyAgent(params: {
|
||||
return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn);
|
||||
}
|
||||
|
||||
const { runResult, fallbackProvider, fallbackModel, directlySentBlockKeys } = runOutcome;
|
||||
const {
|
||||
runId,
|
||||
runResult,
|
||||
fallbackProvider,
|
||||
fallbackModel,
|
||||
fallbackAttempts,
|
||||
directlySentBlockKeys,
|
||||
} = runOutcome;
|
||||
let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome;
|
||||
|
||||
if (
|
||||
@@ -414,6 +430,42 @@ export async function runReplyAgent(params: {
|
||||
const modelUsed = runResult.meta?.agentMeta?.model ?? fallbackModel ?? defaultModel;
|
||||
const providerUsed =
|
||||
runResult.meta?.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider;
|
||||
const verboseEnabled = resolvedVerboseLevel !== "off";
|
||||
const selectedProvider = followupRun.run.provider;
|
||||
const selectedModel = followupRun.run.model;
|
||||
const fallbackStateEntry =
|
||||
activeSessionEntry ?? (sessionKey ? activeSessionStore?.[sessionKey] : undefined);
|
||||
const fallbackTransition = resolveFallbackTransition({
|
||||
selectedProvider,
|
||||
selectedModel,
|
||||
activeProvider: providerUsed,
|
||||
activeModel: modelUsed,
|
||||
attempts: fallbackAttempts,
|
||||
state: fallbackStateEntry,
|
||||
});
|
||||
if (fallbackTransition.stateChanged) {
|
||||
if (fallbackStateEntry) {
|
||||
fallbackStateEntry.fallbackNoticeSelectedModel = fallbackTransition.nextState.selectedModel;
|
||||
fallbackStateEntry.fallbackNoticeActiveModel = fallbackTransition.nextState.activeModel;
|
||||
fallbackStateEntry.fallbackNoticeReason = fallbackTransition.nextState.reason;
|
||||
fallbackStateEntry.updatedAt = Date.now();
|
||||
activeSessionEntry = fallbackStateEntry;
|
||||
}
|
||||
if (sessionKey && fallbackStateEntry && activeSessionStore) {
|
||||
activeSessionStore[sessionKey] = fallbackStateEntry;
|
||||
}
|
||||
if (sessionKey && storePath) {
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey,
|
||||
update: async () => ({
|
||||
fallbackNoticeSelectedModel: fallbackTransition.nextState.selectedModel,
|
||||
fallbackNoticeActiveModel: fallbackTransition.nextState.activeModel,
|
||||
fallbackNoticeReason: fallbackTransition.nextState.reason,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
const cliSessionId = isCliProvider(providerUsed, cfg)
|
||||
? runResult.meta?.agentMeta?.sessionId?.trim()
|
||||
: undefined;
|
||||
@@ -546,9 +598,68 @@ export async function runReplyAgent(params: {
|
||||
}
|
||||
}
|
||||
|
||||
// If verbose is enabled and this is a new session, prepend a session hint.
|
||||
// If verbose is enabled, prepend operational run notices.
|
||||
let finalPayloads = guardedReplyPayloads;
|
||||
const verboseEnabled = resolvedVerboseLevel !== "off";
|
||||
const verboseNotices: ReplyPayload[] = [];
|
||||
|
||||
if (verboseEnabled && activeIsNewSession) {
|
||||
verboseNotices.push({ text: `🧭 New session: ${followupRun.run.sessionId}` });
|
||||
}
|
||||
|
||||
if (fallbackTransition.fallbackTransitioned) {
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
sessionKey,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "fallback",
|
||||
selectedProvider,
|
||||
selectedModel,
|
||||
activeProvider: providerUsed,
|
||||
activeModel: modelUsed,
|
||||
reasonSummary: fallbackTransition.reasonSummary,
|
||||
attemptSummaries: fallbackTransition.attemptSummaries,
|
||||
attempts: fallbackAttempts,
|
||||
},
|
||||
});
|
||||
if (verboseEnabled) {
|
||||
const fallbackNotice = buildFallbackNotice({
|
||||
selectedProvider,
|
||||
selectedModel,
|
||||
activeProvider: providerUsed,
|
||||
activeModel: modelUsed,
|
||||
attempts: fallbackAttempts,
|
||||
});
|
||||
if (fallbackNotice) {
|
||||
verboseNotices.push({ text: fallbackNotice });
|
||||
}
|
||||
}
|
||||
}
|
||||
if (fallbackTransition.fallbackCleared) {
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
sessionKey,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "fallback_cleared",
|
||||
selectedProvider,
|
||||
selectedModel,
|
||||
activeProvider: providerUsed,
|
||||
activeModel: modelUsed,
|
||||
previousActiveModel: fallbackTransition.previousState.activeModel,
|
||||
},
|
||||
});
|
||||
if (verboseEnabled) {
|
||||
verboseNotices.push({
|
||||
text: buildFallbackClearedNotice({
|
||||
selectedProvider,
|
||||
selectedModel,
|
||||
previousActiveModel: fallbackTransition.previousState.activeModel,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (autoCompactionCompleted) {
|
||||
const count = await incrementRunCompactionCount({
|
||||
sessionEntry: activeSessionEntry,
|
||||
@@ -578,11 +689,11 @@ export async function runReplyAgent(params: {
|
||||
|
||||
if (verboseEnabled) {
|
||||
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
||||
finalPayloads = [{ text: `🧹 Auto-compaction complete${suffix}.` }, ...finalPayloads];
|
||||
verboseNotices.push({ text: `🧹 Auto-compaction complete${suffix}.` });
|
||||
}
|
||||
}
|
||||
if (verboseEnabled && activeIsNewSession) {
|
||||
finalPayloads = [{ text: `🧭 New session: ${followupRun.run.sessionId}` }, ...finalPayloads];
|
||||
if (verboseNotices.length > 0) {
|
||||
finalPayloads = [...verboseNotices, ...finalPayloads];
|
||||
}
|
||||
if (responseUsageLine) {
|
||||
finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
} from "../../infra/provider-usage.js";
|
||||
import type { MediaUnderstandingDecision } from "../../media-understanding/types.js";
|
||||
import { normalizeGroupActivation } from "../group-activation.js";
|
||||
import { resolveSelectedAndActiveModel } from "../model-runtime.js";
|
||||
import { buildStatusMessage } from "../status.js";
|
||||
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js";
|
||||
import type { ReplyPayload } from "../types.js";
|
||||
@@ -136,6 +137,25 @@ export async function buildStatusReply(params: {
|
||||
const groupActivation = isGroup
|
||||
? (normalizeGroupActivation(sessionEntry?.groupActivation) ?? defaultGroupActivation())
|
||||
: undefined;
|
||||
const modelRefs = resolveSelectedAndActiveModel({
|
||||
selectedProvider: provider,
|
||||
selectedModel: model,
|
||||
sessionEntry,
|
||||
});
|
||||
const selectedModelAuth = resolveModelAuthLabel({
|
||||
provider,
|
||||
cfg,
|
||||
sessionEntry,
|
||||
agentDir: statusAgentDir,
|
||||
});
|
||||
const activeModelAuth = modelRefs.activeDiffers
|
||||
? resolveModelAuthLabel({
|
||||
provider: modelRefs.active.provider,
|
||||
cfg,
|
||||
sessionEntry,
|
||||
agentDir: statusAgentDir,
|
||||
})
|
||||
: selectedModelAuth;
|
||||
const agentDefaults = cfg.agents?.defaults ?? {};
|
||||
const statusText = buildStatusMessage({
|
||||
config: cfg,
|
||||
@@ -160,12 +180,8 @@ export async function buildStatusReply(params: {
|
||||
resolvedVerbose: resolvedVerboseLevel,
|
||||
resolvedReasoning: resolvedReasoningLevel,
|
||||
resolvedElevated: resolvedElevatedLevel,
|
||||
modelAuth: resolveModelAuthLabel({
|
||||
provider,
|
||||
cfg,
|
||||
sessionEntry,
|
||||
agentDir: statusAgentDir,
|
||||
}),
|
||||
modelAuth: selectedModelAuth,
|
||||
activeModelAuth,
|
||||
usageLine: usageLine ?? undefined,
|
||||
queue: {
|
||||
mode: queueSettings.mode,
|
||||
|
||||
@@ -106,6 +106,7 @@ export async function handleDirectiveOnly(
|
||||
allowedModelCatalog,
|
||||
resetModelOverride,
|
||||
surface: params.surface,
|
||||
sessionEntry,
|
||||
});
|
||||
if (modelInfo) {
|
||||
return modelInfo;
|
||||
|
||||
@@ -63,6 +63,32 @@ describe("/model chat UX", () => {
|
||||
expect(reply?.text).toContain("Switch: /model <provider/model>");
|
||||
});
|
||||
|
||||
it("shows active runtime model when different from selected model", async () => {
|
||||
const directives = parseInlineDirectives("/model");
|
||||
const cfg = { commands: { text: true } } as unknown as OpenClawConfig;
|
||||
|
||||
const reply = await maybeHandleModelDirectiveInfo({
|
||||
directives,
|
||||
cfg,
|
||||
agentDir: "/tmp/agent",
|
||||
activeAgentId: "main",
|
||||
provider: "fireworks",
|
||||
model: "fireworks/minimax-m2p5",
|
||||
defaultProvider: "fireworks",
|
||||
defaultModel: "fireworks/minimax-m2p5",
|
||||
aliasIndex: baseAliasIndex(),
|
||||
allowedModelCatalog: [],
|
||||
resetModelOverride: false,
|
||||
sessionEntry: {
|
||||
modelProvider: "deepinfra",
|
||||
model: "moonshotai/Kimi-K2.5",
|
||||
},
|
||||
});
|
||||
|
||||
expect(reply?.text).toContain("Current: fireworks/minimax-m2p5 (selected)");
|
||||
expect(reply?.text).toContain("Active: deepinfra/moonshotai/Kimi-K2.5 (runtime)");
|
||||
});
|
||||
|
||||
it("auto-applies closest match for typos", () => {
|
||||
const directives = parseInlineDirectives("/model anthropic/claud-opus-4-5");
|
||||
const cfg = { commands: { text: true } } as unknown as OpenClawConfig;
|
||||
|
||||
@@ -7,8 +7,10 @@ import {
|
||||
resolveModelRefFromString,
|
||||
} from "../../agents/model-selection.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { buildBrowseProvidersButton } from "../../telegram/model-buttons.js";
|
||||
import { shortenHomePath } from "../../utils.js";
|
||||
import { resolveSelectedAndActiveModel } from "../model-runtime.js";
|
||||
import type { ReplyPayload } from "../types.js";
|
||||
import { resolveModelsCommandReply } from "./commands-models.js";
|
||||
import {
|
||||
@@ -198,6 +200,7 @@ export async function maybeHandleModelDirectiveInfo(params: {
|
||||
allowedModelCatalog: Array<{ provider: string; id?: string; name?: string }>;
|
||||
resetModelOverride: boolean;
|
||||
surface?: string;
|
||||
sessionEntry?: Pick<SessionEntry, "modelProvider" | "model">;
|
||||
}): Promise<ReplyPayload | undefined> {
|
||||
if (!params.directives.hasModelDirective) {
|
||||
return undefined;
|
||||
@@ -233,31 +236,45 @@ export async function maybeHandleModelDirectiveInfo(params: {
|
||||
}
|
||||
|
||||
if (wantsSummary) {
|
||||
const current = `${params.provider}/${params.model}`;
|
||||
const modelRefs = resolveSelectedAndActiveModel({
|
||||
selectedProvider: params.provider,
|
||||
selectedModel: params.model,
|
||||
sessionEntry: params.sessionEntry,
|
||||
});
|
||||
const current = modelRefs.selected.label;
|
||||
const isTelegram = params.surface === "telegram";
|
||||
const activeRuntimeLine = modelRefs.activeDiffers
|
||||
? `Active: ${modelRefs.active.label} (runtime)`
|
||||
: null;
|
||||
|
||||
if (isTelegram) {
|
||||
const buttons = buildBrowseProvidersButton();
|
||||
return {
|
||||
text: [
|
||||
`Current: ${current}`,
|
||||
`Current: ${current}${modelRefs.activeDiffers ? " (selected)" : ""}`,
|
||||
activeRuntimeLine,
|
||||
"",
|
||||
"Tap below to browse models, or use:",
|
||||
"/model <provider/model> to switch",
|
||||
"/model status for details",
|
||||
].join("\n"),
|
||||
]
|
||||
.filter(Boolean)
|
||||
.join("\n"),
|
||||
channelData: { telegram: { buttons } },
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
text: [
|
||||
`Current: ${current}`,
|
||||
`Current: ${current}${modelRefs.activeDiffers ? " (selected)" : ""}`,
|
||||
activeRuntimeLine,
|
||||
"",
|
||||
"Switch: /model <provider/model>",
|
||||
"Browse: /models (providers) or /models <provider> (models)",
|
||||
"More: /model status",
|
||||
].join("\n"),
|
||||
]
|
||||
.filter(Boolean)
|
||||
.join("\n"),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -284,14 +301,20 @@ export async function maybeHandleModelDirectiveInfo(params: {
|
||||
authByProvider.set(provider, formatAuthLabel(auth));
|
||||
}
|
||||
|
||||
const current = `${params.provider}/${params.model}`;
|
||||
const modelRefs = resolveSelectedAndActiveModel({
|
||||
selectedProvider: params.provider,
|
||||
selectedModel: params.model,
|
||||
sessionEntry: params.sessionEntry,
|
||||
});
|
||||
const current = modelRefs.selected.label;
|
||||
const defaultLabel = `${params.defaultProvider}/${params.defaultModel}`;
|
||||
const lines = [
|
||||
`Current: ${current}`,
|
||||
`Current: ${current}${modelRefs.activeDiffers ? " (selected)" : ""}`,
|
||||
modelRefs.activeDiffers ? `Active: ${modelRefs.active.label} (runtime)` : null,
|
||||
`Default: ${defaultLabel}`,
|
||||
`Agent: ${params.activeAgentId}`,
|
||||
`Auth file: ${formatPath(resolveAuthStorePathForDisplay(params.agentDir))}`,
|
||||
];
|
||||
].filter((line): line is string => Boolean(line));
|
||||
if (params.resetModelOverride) {
|
||||
lines.push(`(previous selection reset to default)`);
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ describe("buildStatusMessage", () => {
|
||||
expect(optionsLine).not.toContain("elevated");
|
||||
});
|
||||
|
||||
it("prefers model overrides over last-run model", () => {
|
||||
it("shows selected model and active runtime model when they differ", () => {
|
||||
const text = buildStatusMessage({
|
||||
agent: {
|
||||
model: "anthropic/claude-opus-4-5",
|
||||
@@ -203,15 +203,76 @@ describe("buildStatusMessage", () => {
|
||||
modelOverride: "gpt-4.1-mini",
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-haiku-4-5",
|
||||
fallbackNoticeSelectedModel: "openai/gpt-4.1-mini",
|
||||
fallbackNoticeActiveModel: "anthropic/claude-haiku-4-5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
contextTokens: 32_000,
|
||||
},
|
||||
sessionKey: "agent:main:main",
|
||||
sessionScope: "per-sender",
|
||||
queue: { mode: "collect", depth: 0 },
|
||||
modelAuth: "api-key",
|
||||
activeModelAuth: "api-key di_123…abc (deepinfra:default)",
|
||||
});
|
||||
|
||||
const normalized = normalizeTestText(text);
|
||||
expect(normalized).toContain("Model: openai/gpt-4.1-mini");
|
||||
expect(normalized).toContain("Fallback: anthropic/claude-haiku-4-5");
|
||||
expect(normalized).toContain("(rate limit)");
|
||||
expect(normalized).not.toContain(" - Reason:");
|
||||
expect(normalized).not.toContain("Active:");
|
||||
expect(normalized).toContain("di_123...abc");
|
||||
});
|
||||
|
||||
it("omits active fallback details when runtime drift does not match fallback state", () => {
|
||||
const text = buildStatusMessage({
|
||||
agent: {
|
||||
model: "openai/gpt-4.1-mini",
|
||||
contextTokens: 32_000,
|
||||
},
|
||||
sessionEntry: {
|
||||
sessionId: "runtime-drift-only",
|
||||
updatedAt: 0,
|
||||
modelProvider: "anthropic",
|
||||
model: "claude-haiku-4-5",
|
||||
fallbackNoticeSelectedModel: "fireworks/minimax-m2p5",
|
||||
fallbackNoticeActiveModel: "deepinfra/moonshotai/Kimi-K2.5",
|
||||
fallbackNoticeReason: "rate limit",
|
||||
},
|
||||
sessionKey: "agent:main:main",
|
||||
sessionScope: "per-sender",
|
||||
queue: { mode: "collect", depth: 0 },
|
||||
modelAuth: "api-key",
|
||||
activeModelAuth: "api-key di_123…abc (deepinfra:default)",
|
||||
});
|
||||
|
||||
const normalized = normalizeTestText(text);
|
||||
expect(normalized).toContain("Model: openai/gpt-4.1-mini");
|
||||
expect(normalized).not.toContain("Fallback:");
|
||||
expect(normalized).not.toContain("(rate limit)");
|
||||
});
|
||||
|
||||
it("omits active lines when runtime matches selected model", () => {
|
||||
const text = buildStatusMessage({
|
||||
agent: {
|
||||
model: "openai/gpt-4.1-mini",
|
||||
contextTokens: 32_000,
|
||||
},
|
||||
sessionEntry: {
|
||||
sessionId: "selected-active-same",
|
||||
updatedAt: 0,
|
||||
modelProvider: "openai",
|
||||
model: "gpt-4.1-mini",
|
||||
fallbackNoticeReason: "unknown",
|
||||
},
|
||||
sessionKey: "agent:main:main",
|
||||
sessionScope: "per-sender",
|
||||
queue: { mode: "collect", depth: 0 },
|
||||
modelAuth: "api-key",
|
||||
});
|
||||
|
||||
expect(normalizeTestText(text)).toContain("Model: openai/gpt-4.1-mini");
|
||||
const normalized = normalizeTestText(text);
|
||||
expect(normalized).not.toContain("Fallback:");
|
||||
});
|
||||
|
||||
it("keeps provider prefix from configured model", () => {
|
||||
|
||||
@@ -40,6 +40,8 @@ import {
|
||||
type ChatCommandDefinition,
|
||||
} from "./commands-registry.js";
|
||||
import type { CommandCategory } from "./commands-registry.types.js";
|
||||
import { resolveActiveFallbackState } from "./fallback-state.js";
|
||||
import { formatProviderModelRef, resolveSelectedAndActiveModel } from "./model-runtime.js";
|
||||
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "./thinking.js";
|
||||
|
||||
type AgentDefaults = NonNullable<NonNullable<OpenClawConfig["agents"]>["defaults"]>;
|
||||
@@ -72,6 +74,7 @@ type StatusArgs = {
|
||||
resolvedReasoning?: ReasoningLevel;
|
||||
resolvedElevated?: ElevatedLevel;
|
||||
modelAuth?: string;
|
||||
activeModelAuth?: string;
|
||||
usageLine?: string;
|
||||
timeLine?: string;
|
||||
queue?: QueueStatus;
|
||||
@@ -339,12 +342,19 @@ export function buildStatusMessage(args: StatusArgs): string {
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
defaultModel: DEFAULT_MODEL,
|
||||
});
|
||||
const provider = entry?.providerOverride ?? resolved.provider ?? DEFAULT_PROVIDER;
|
||||
let model = entry?.modelOverride ?? resolved.model ?? DEFAULT_MODEL;
|
||||
const selectedProvider = entry?.providerOverride ?? resolved.provider ?? DEFAULT_PROVIDER;
|
||||
const selectedModel = entry?.modelOverride ?? resolved.model ?? DEFAULT_MODEL;
|
||||
const modelRefs = resolveSelectedAndActiveModel({
|
||||
selectedProvider,
|
||||
selectedModel,
|
||||
sessionEntry: entry,
|
||||
});
|
||||
let activeProvider = modelRefs.active.provider;
|
||||
let activeModel = modelRefs.active.model;
|
||||
let contextTokens =
|
||||
entry?.contextTokens ??
|
||||
args.agent?.contextTokens ??
|
||||
lookupContextTokens(model) ??
|
||||
lookupContextTokens(activeModel) ??
|
||||
DEFAULT_CONTEXT_TOKENS;
|
||||
|
||||
let inputTokens = entry?.inputTokens;
|
||||
@@ -366,8 +376,18 @@ export function buildStatusMessage(args: StatusArgs): string {
|
||||
if (!totalTokens || totalTokens === 0 || candidate > totalTokens) {
|
||||
totalTokens = candidate;
|
||||
}
|
||||
if (!model) {
|
||||
model = logUsage.model ?? model;
|
||||
if (!entry?.model && logUsage.model) {
|
||||
const slashIndex = logUsage.model.indexOf("/");
|
||||
if (slashIndex > 0) {
|
||||
const provider = logUsage.model.slice(0, slashIndex).trim();
|
||||
const model = logUsage.model.slice(slashIndex + 1).trim();
|
||||
if (provider && model) {
|
||||
activeProvider = provider;
|
||||
activeModel = model;
|
||||
}
|
||||
} else {
|
||||
activeModel = logUsage.model;
|
||||
}
|
||||
}
|
||||
if (!contextTokens && logUsage.model) {
|
||||
contextTokens = lookupContextTokens(logUsage.model) ?? contextTokens;
|
||||
@@ -440,14 +460,21 @@ export function buildStatusMessage(args: StatusArgs): string {
|
||||
];
|
||||
const activationLine = activationParts.filter(Boolean).join(" · ");
|
||||
|
||||
const authMode = resolveModelAuthMode(provider, args.config);
|
||||
const authLabelValue =
|
||||
args.modelAuth ?? (authMode && authMode !== "unknown" ? authMode : undefined);
|
||||
const showCost = authLabelValue === "api-key" || authLabelValue === "mixed";
|
||||
const activeAuthMode = resolveModelAuthMode(activeProvider, args.config);
|
||||
const selectedAuthLabelValue =
|
||||
args.modelAuth ??
|
||||
(() => {
|
||||
const selectedAuthMode = resolveModelAuthMode(selectedProvider, args.config);
|
||||
return selectedAuthMode && selectedAuthMode !== "unknown" ? selectedAuthMode : undefined;
|
||||
})();
|
||||
const activeAuthLabelValue =
|
||||
args.activeModelAuth ??
|
||||
(activeAuthMode && activeAuthMode !== "unknown" ? activeAuthMode : undefined);
|
||||
const showCost = activeAuthLabelValue === "api-key" || activeAuthLabelValue === "mixed";
|
||||
const costConfig = showCost
|
||||
? resolveModelCostConfig({
|
||||
provider,
|
||||
model,
|
||||
provider: activeProvider,
|
||||
model: activeModel,
|
||||
config: args.config,
|
||||
})
|
||||
: undefined;
|
||||
@@ -464,9 +491,21 @@ export function buildStatusMessage(args: StatusArgs): string {
|
||||
: undefined;
|
||||
const costLabel = showCost && hasUsage ? formatUsd(cost) : undefined;
|
||||
|
||||
const modelLabel = model ? `${provider}/${model}` : "unknown";
|
||||
const authLabel = authLabelValue ? ` · 🔑 ${authLabelValue}` : "";
|
||||
const modelLine = `🧠 Model: ${modelLabel}${authLabel}`;
|
||||
const selectedModelLabel = modelRefs.selected.label || "unknown";
|
||||
const activeModelLabel = formatProviderModelRef(activeProvider, activeModel) || "unknown";
|
||||
const fallbackState = resolveActiveFallbackState({
|
||||
selectedModelRef: selectedModelLabel,
|
||||
activeModelRef: activeModelLabel,
|
||||
state: entry,
|
||||
});
|
||||
const selectedAuthLabel = selectedAuthLabelValue ? ` · 🔑 ${selectedAuthLabelValue}` : "";
|
||||
const modelLine = `🧠 Model: ${selectedModelLabel}${selectedAuthLabel}`;
|
||||
const showFallbackAuth = activeAuthLabelValue && activeAuthLabelValue !== selectedAuthLabelValue;
|
||||
const fallbackLine = fallbackState.active
|
||||
? `↪️ Fallback: ${activeModelLabel}${
|
||||
showFallbackAuth ? ` · 🔑 ${activeAuthLabelValue}` : ""
|
||||
} (${fallbackState.reason ?? "selected model unavailable"})`
|
||||
: null;
|
||||
const commit = resolveCommitHash();
|
||||
const versionLine = `🦞 OpenClaw ${VERSION}${commit ? ` (${commit})` : ""}`;
|
||||
const usagePair = formatUsagePair(inputTokens, outputTokens);
|
||||
@@ -480,6 +519,7 @@ export function buildStatusMessage(args: StatusArgs): string {
|
||||
versionLine,
|
||||
args.timeLine,
|
||||
modelLine,
|
||||
fallbackLine,
|
||||
usageCostLine,
|
||||
`📚 ${contextLine}`,
|
||||
mediaLine,
|
||||
|
||||
@@ -80,6 +80,13 @@ export type SessionEntry = {
|
||||
totalTokensFresh?: boolean;
|
||||
modelProvider?: string;
|
||||
model?: string;
|
||||
/**
|
||||
* Last selected/runtime model pair for which a fallback notice was emitted.
|
||||
* Used to avoid repeating the same fallback notice every turn.
|
||||
*/
|
||||
fallbackNoticeSelectedModel?: string;
|
||||
fallbackNoticeActiveModel?: string;
|
||||
fallbackNoticeReason?: string;
|
||||
contextTokens?: number;
|
||||
compactionCount?: number;
|
||||
memoryFlushAt?: number;
|
||||
|
||||
@@ -256,4 +256,141 @@ describe("agent event handler", () => {
|
||||
expect(payload.data?.result).toEqual(result);
|
||||
resetAgentRunContextForTest();
|
||||
});
|
||||
|
||||
it("broadcasts fallback events to agent subscribers and node session", () => {
|
||||
const { broadcast, broadcastToConnIds, nodeSendToSession, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-fallback",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-fallback",
|
||||
seq: 1,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: {
|
||||
phase: "fallback",
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "deepinfra",
|
||||
activeModel: "moonshotai/Kimi-K2.5",
|
||||
},
|
||||
});
|
||||
|
||||
expect(broadcastToConnIds).not.toHaveBeenCalled();
|
||||
const broadcastAgentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
|
||||
expect(broadcastAgentCalls).toHaveLength(1);
|
||||
const payload = broadcastAgentCalls[0]?.[1] as {
|
||||
sessionKey?: string;
|
||||
stream?: string;
|
||||
data?: Record<string, unknown>;
|
||||
};
|
||||
expect(payload.stream).toBe("lifecycle");
|
||||
expect(payload.data?.phase).toBe("fallback");
|
||||
expect(payload.sessionKey).toBe("session-fallback");
|
||||
expect(payload.data?.activeProvider).toBe("deepinfra");
|
||||
|
||||
const nodeCalls = nodeSendToSession.mock.calls.filter(([, event]) => event === "agent");
|
||||
expect(nodeCalls).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("remaps chat-linked lifecycle runId to client runId", () => {
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-fallback",
|
||||
});
|
||||
chatRunState.registry.add("run-fallback-internal", {
|
||||
sessionKey: "session-fallback",
|
||||
clientRunId: "run-fallback-client",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-fallback-internal",
|
||||
seq: 1,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: {
|
||||
phase: "fallback",
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "deepinfra",
|
||||
activeModel: "moonshotai/Kimi-K2.5",
|
||||
},
|
||||
});
|
||||
|
||||
const broadcastAgentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
|
||||
expect(broadcastAgentCalls).toHaveLength(1);
|
||||
const payload = broadcastAgentCalls[0]?.[1] as {
|
||||
runId?: string;
|
||||
sessionKey?: string;
|
||||
stream?: string;
|
||||
data?: Record<string, unknown>;
|
||||
};
|
||||
expect(payload.runId).toBe("run-fallback-client");
|
||||
expect(payload.stream).toBe("lifecycle");
|
||||
expect(payload.data?.phase).toBe("fallback");
|
||||
|
||||
const nodeCalls = nodeSendToSession.mock.calls.filter(([, event]) => event === "agent");
|
||||
expect(nodeCalls).toHaveLength(1);
|
||||
const nodePayload = nodeCalls[0]?.[2] as { runId?: string };
|
||||
expect(nodePayload.runId).toBe("run-fallback-client");
|
||||
});
|
||||
|
||||
it("uses agent event sessionKey when run-context lookup cannot resolve", () => {
|
||||
const { broadcast, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => undefined,
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-fallback-session-key",
|
||||
seq: 1,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
sessionKey: "session-from-event",
|
||||
data: {
|
||||
phase: "fallback",
|
||||
selectedProvider: "fireworks",
|
||||
selectedModel: "fireworks/minimax-m2p5",
|
||||
activeProvider: "deepinfra",
|
||||
activeModel: "moonshotai/Kimi-K2.5",
|
||||
},
|
||||
});
|
||||
|
||||
const broadcastAgentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
|
||||
expect(broadcastAgentCalls).toHaveLength(1);
|
||||
const payload = broadcastAgentCalls[0]?.[1] as { sessionKey?: string };
|
||||
expect(payload.sessionKey).toBe("session-from-event");
|
||||
});
|
||||
|
||||
it("remaps chat-linked tool runId for non-full verbose payloads", () => {
|
||||
const { broadcastToConnIds, chatRunState, toolEventRecipients, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-tool-remap",
|
||||
});
|
||||
|
||||
chatRunState.registry.add("run-tool-internal", {
|
||||
sessionKey: "session-tool-remap",
|
||||
clientRunId: "run-tool-client",
|
||||
});
|
||||
registerAgentRunContext("run-tool-internal", {
|
||||
sessionKey: "session-tool-remap",
|
||||
verboseLevel: "on",
|
||||
});
|
||||
toolEventRecipients.add("run-tool-internal", "conn-1");
|
||||
|
||||
handler({
|
||||
runId: "run-tool-internal",
|
||||
seq: 1,
|
||||
stream: "tool",
|
||||
ts: Date.now(),
|
||||
data: {
|
||||
phase: "result",
|
||||
name: "exec",
|
||||
toolCallId: "tool-remap-1",
|
||||
result: { content: [{ type: "text", text: "secret" }] },
|
||||
},
|
||||
});
|
||||
|
||||
expect(broadcastToConnIds).toHaveBeenCalledTimes(1);
|
||||
const payload = broadcastToConnIds.mock.calls[0]?.[1] as { runId?: string };
|
||||
expect(payload.runId).toBe("run-tool-client");
|
||||
resetAgentRunContextForTest();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -325,12 +325,17 @@ export function createAgentEventHandler({
|
||||
|
||||
return (evt: AgentEventPayload) => {
|
||||
const chatLink = chatRunState.registry.peek(evt.runId);
|
||||
const sessionKey = chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId);
|
||||
const eventSessionKey =
|
||||
typeof evt.sessionKey === "string" && evt.sessionKey.trim() ? evt.sessionKey : undefined;
|
||||
const sessionKey =
|
||||
chatLink?.sessionKey ?? eventSessionKey ?? resolveSessionKeyForRun(evt.runId);
|
||||
const clientRunId = chatLink?.clientRunId ?? evt.runId;
|
||||
const eventRunId = chatLink?.clientRunId ?? evt.runId;
|
||||
const eventForClients = chatLink ? { ...evt, runId: eventRunId } : evt;
|
||||
const isAborted =
|
||||
chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId);
|
||||
// Include sessionKey so Control UI can filter tool streams per session.
|
||||
const agentPayload = sessionKey ? { ...evt, sessionKey } : evt;
|
||||
const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients;
|
||||
const last = agentRunSeq.get(evt.runId) ?? 0;
|
||||
const isToolEvent = evt.stream === "tool";
|
||||
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
|
||||
@@ -341,12 +346,14 @@ export function createAgentEventHandler({
|
||||
const data = evt.data ? { ...evt.data } : {};
|
||||
delete data.result;
|
||||
delete data.partialResult;
|
||||
return sessionKey ? { ...evt, sessionKey, data } : { ...evt, data };
|
||||
return sessionKey
|
||||
? { ...eventForClients, sessionKey, data }
|
||||
: { ...eventForClients, data };
|
||||
})()
|
||||
: agentPayload;
|
||||
if (evt.seq !== last + 1) {
|
||||
broadcast("agent", {
|
||||
runId: evt.runId,
|
||||
runId: eventRunId,
|
||||
stream: "error",
|
||||
ts: Date.now(),
|
||||
sessionKey,
|
||||
@@ -399,7 +406,7 @@ export function createAgentEventHandler({
|
||||
} else {
|
||||
emitChatFinal(
|
||||
sessionKey,
|
||||
evt.runId,
|
||||
eventRunId,
|
||||
evt.seq,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
|
||||
Reference in New Issue
Block a user