diff --git a/CHANGELOG.md b/CHANGELOG.md index 40a7cd4ef73..4d5847bba62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai ### Changes +- Agents/failover: harden state-aware lane suspension by persisting quota resume transitions, restoring configured lane concurrency, preserving non-quota failure reasons, and exporting model failover events through diagnostics OTLP. Thanks @BunsDev. - Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus. - Runtime/install: raise the supported Node 22 floor to `22.16+` so native SQLite query handling can rely on the `node:sqlite` statement metadata API while continuing to recommend Node 24. (#78921) - Discord/voice: include a bounded one-line STT transcript preview in verbose voice logs so live voice debugging shows what speakers said before the agent reply. diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts index f4c05b3d73f..02fba01e2b8 100644 --- a/extensions/diagnostics-otel/src/service.test.ts +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -1520,6 +1520,55 @@ describe("diagnostics-otel service", () => { await service.stop?.(ctx); }); + test("exports model failover spans", async () => { + const service = createDiagnosticsOtelService(); + const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true }); + await service.start(ctx); + + emitTrustedDiagnosticEvent({ + type: "model.failover", + sessionId: "session-1", + lane: "main", + fromProvider: "anthropic", + fromModel: "claude-opus-4-6", + toProvider: "openai", + toModel: "gpt-5.4", + reason: "overloaded", + suspended: true, + cascadeDepth: 1, + }); + await flushDiagnosticEvents(); + + const failoverCall = telemetryState.tracer.startSpan.mock.calls.find( + (call) => call[0] === "openclaw.model.failover", + ); + expect(failoverCall?.[1]).toMatchObject({ + attributes: { + "openclaw.provider": "anthropic", + "openclaw.model": "claude-opus-4-6", + "openclaw.failover.to_provider": "openai", + "openclaw.failover.to_model": "gpt-5.4", + "openclaw.failover.reason": "overloaded", + "openclaw.failover.suspended": true, + "openclaw.failover.cascade_depth": 1, + "openclaw.lane": "main", + }, + startTime: expect.any(Number), + }); + expect(failoverCall?.[1]).toEqual({ + attributes: expect.not.objectContaining({ + "openclaw.sessionId": expect.anything(), + "openclaw.sessionKey": expect.anything(), + }), + startTime: expect.any(Number), + }); + const span = telemetryState.spans.find( + (candidate) => candidate.name === "openclaw.model.failover", + ); + expect(span?.end).toHaveBeenCalledWith(expect.any(Number)); + await service.stop?.(ctx); + }); + test("maps model call APIs to GenAI operation names and error type", async () => { const service = createDiagnosticsOtelService(); const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true }); diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index c22bc53a5f5..def592bd25d 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -83,6 +83,7 @@ type ModelCallLifecycleDiagnosticEvent = Extract< DiagnosticEventPayload, { type: "model.call.completed" | "model.call.error" } >; +type ModelFailoverDiagnosticEvent = Extract; type HarnessRunDiagnosticEvent = Extract< DiagnosticEventPayload, { type: "harness.run.started" | "harness.run.completed" | "harness.run.error" } @@ -1844,6 +1845,44 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { span.end(evt.ts); }; + const recordModelFailover = ( + evt: ModelFailoverDiagnosticEvent, + metadata: DiagnosticEventMetadata, + ) => { + if (!tracesEnabled) { + return; + } + const spanAttrs: Record = { + "openclaw.failover.reason": lowCardinalityAttr(evt.reason, "unknown"), + }; + if (evt.fromProvider) { + spanAttrs["openclaw.provider"] = evt.fromProvider; + } + if (evt.fromModel) { + spanAttrs["openclaw.model"] = evt.fromModel; + } + if (evt.toProvider) { + spanAttrs["openclaw.failover.to_provider"] = evt.toProvider; + } + if (evt.toModel) { + spanAttrs["openclaw.failover.to_model"] = evt.toModel; + } + if (evt.lane) { + spanAttrs["openclaw.lane"] = lowCardinalityAttr(evt.lane, "unknown"); + } + if (evt.suspended !== undefined) { + spanAttrs["openclaw.failover.suspended"] = evt.suspended; + } + if (evt.cascadeDepth !== undefined) { + spanAttrs["openclaw.failover.cascade_depth"] = evt.cascadeDepth; + } + const span = spanWithDuration("openclaw.model.failover", spanAttrs, 0, { + parentContext: activeTrustedParentContext(evt, metadata), + endTimeMs: evt.ts, + }); + span.end(evt.ts); + }; + const modelCallMetricAttrs = (evt: ModelCallLifecycleDiagnosticEvent) => ({ "openclaw.provider": evt.provider, "openclaw.model": evt.model, @@ -2421,6 +2460,9 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { return; case "payload.large": return; + case "model.failover": + recordModelFailover(evt, metadata); + return; } } catch (err) { ctx.logger.error( diff --git a/src/agents/compaction.ts b/src/agents/compaction.ts index 888daf7bfec..7a1721aeb8d 100644 --- a/src/agents/compaction.ts +++ b/src/agents/compaction.ts @@ -40,6 +40,22 @@ const IDENTIFIER_PRESERVATION_INSTRUCTIONS = "Preserve all opaque identifiers exactly as written (no shortening or reconstruction), " + "including UUIDs, hashes, IDs, hostnames, IPs, ports, URLs, and file names."; +const HANDOFF_INSTRUCTIONS = [ + "Generate a concise recovery briefing for a new LLM taking over this session.", + "The previous model hit a quota limit and you are providing the context for a smooth handoff.", + "", + "LEADER HIERARCHY REINFORCEMENT:", + "- Explicitly state that the new model is the LEADER (Orchestrator).", + "- Identify any active autonomous units (like AutoClaw) as SUBORDINATES.", + "- Instruct the new model to NOT perform the subordinate's task, but to supervise and provide strategic commands.", + "", + "MUST CAPTURE:", + "- Current high-level goal and project path.", + "- Status of the latest tool executions (especially AutoClaw/Subagents).", + "- Critical files currently being modified.", + "- Pending items and next intended steps.", +].join("\n"); + export type CompactionSummarizationInstructions = { identifierPolicy?: AgentCompactionIdentifierPolicy; identifierInstructions?: string; @@ -518,6 +534,7 @@ export function pruneHistoryForContextShare(params: { maxContextTokens: number; maxHistoryShare?: number; parts?: number; + mode?: "share" | "handoff"; }): { messages: AgentMessage[]; droppedMessagesList: AgentMessage[]; @@ -527,7 +544,9 @@ export function pruneHistoryForContextShare(params: { keptTokens: number; budgetTokens: number; } { - const maxHistoryShare = params.maxHistoryShare ?? 0.5; + const isHandoff = params.mode === "handoff"; + const defaultShare = isHandoff ? 0.2 : 0.5; // Stricter budget for handoff snapshots + const maxHistoryShare = params.maxHistoryShare ?? defaultShare; const budgetTokens = Math.max(1, Math.floor(params.maxContextTokens * maxHistoryShare)); let keptMessages = params.messages; const allDroppedMessages: AgentMessage[] = []; @@ -577,6 +596,36 @@ export function pruneHistoryForContextShare(params: { }; } +/** + * Generates a concise handoff summary for model transitions, enforcing a 4000 token limit. + */ +export async function summarizeForHandoff(params: { + messages: AgentMessage[]; + model: NonNullable; + apiKey: string; + headers?: Record; + signal: AbortSignal; + maxChunkTokens: number; + contextWindow: number; + customInstructions?: string; + summarizationInstructions?: CompactionSummarizationInstructions; +}): Promise { + const custom = params.customInstructions?.trim(); + const handoffInstructions = custom + ? `${HANDOFF_INSTRUCTIONS}\n\n${custom}` + : HANDOFF_INSTRUCTIONS; + + // Use a hard cap of 4000 tokens for the handoff summary as per plan + const handoffMaxTokens = 4000; + + return summarizeWithFallback({ + ...params, + reserveTokens: SUMMARIZATION_OVERHEAD_TOKENS, + maxChunkTokens: Math.min(params.maxChunkTokens, handoffMaxTokens), + customInstructions: handoffInstructions, + }); +} + export function resolveContextWindowTokens(model?: ExtensionContext["model"]): number { const effective = (model as { contextTokens?: number } | undefined)?.contextTokens ?? model?.contextWindow; diff --git a/src/agents/failover-error.ts b/src/agents/failover-error.ts index abe215d1f6c..65e7d673b68 100644 --- a/src/agents/failover-error.ts +++ b/src/agents/failover-error.ts @@ -27,6 +27,7 @@ export class FailoverError extends Error { // See #42713. readonly sessionId?: string; readonly lane?: string; + readonly suspend?: boolean; constructor( message: string, @@ -41,6 +42,7 @@ export class FailoverError extends Error { sessionId?: string; lane?: string; cause?: unknown; + suspend?: boolean; }, ) { super(message, { cause: params.cause }); @@ -54,6 +56,7 @@ export class FailoverError extends Error { this.rawError = params.rawError; this.sessionId = params.sessionId; this.lane = params.lane; + this.suspend = params.suspend; } } @@ -486,6 +489,10 @@ export function coerceToFailoverError( const status = signal.status ?? resolveFailoverStatus(reason); const code = signal.code; + // Suspend when hitting rate limits or billing issues in an attributed session + const shouldSuspend = + Boolean(context?.sessionId) && (reason === "rate_limit" || reason === "billing"); + return new FailoverError(message, { reason, provider: context?.provider ?? signal.provider, @@ -497,5 +504,6 @@ export function coerceToFailoverError( code, rawError: message, cause: err instanceof Error ? err : undefined, + suspend: shouldSuspend, }); } diff --git a/src/agents/model-fallback.test.ts b/src/agents/model-fallback.test.ts index 3fe1ba7e22a..18c86bead79 100644 --- a/src/agents/model-fallback.test.ts +++ b/src/agents/model-fallback.test.ts @@ -1672,6 +1672,13 @@ describe("runWithModelFallback", () => { return { dir: tmpDir }; } + it("maps non-quota cooldown suspensions to circuit-open session state", () => { + expect(__testing.resolveSessionSuspensionReason("rate_limit")).toBe("quota_exhausted"); + expect(__testing.resolveSessionSuspensionReason("overloaded")).toBe("circuit_open"); + expect(__testing.resolveSessionSuspensionReason("timeout")).toBe("circuit_open"); + expect(__testing.resolveSessionSuspensionReason("billing")).toBe("manual"); + }); + it("attempts same-provider fallbacks during transient cooldowns", async () => { const { dir } = await makeAuthStoreWithCooldown("anthropic", "timeout"); const cfg = makeCfg({ diff --git a/src/agents/model-fallback.ts b/src/agents/model-fallback.ts index 7271831dbff..d0563e6d22c 100644 --- a/src/agents/model-fallback.ts +++ b/src/agents/model-fallback.ts @@ -3,6 +3,7 @@ import { resolveAgentModelPrimaryValue, } from "../config/model-input.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { emitFailoverEvent } from "../infra/diagnostic-events.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { createLazyImportLoader } from "../shared/lazy-promise.js"; @@ -41,6 +42,7 @@ import { } from "./model-selection-resolve.js"; import { isLikelyContextOverflowError } from "./pi-embedded-helpers/errors.js"; import type { FailoverReason } from "./pi-embedded-helpers/types.js"; +import { resolveSessionSuspensionReason, suspendSession } from "./session-suspension.js"; const log = createSubsystemLogger("model-fallback"); @@ -397,10 +399,25 @@ function throwFallbackFailureSummary(params: { formatAttempt: (attempt: FallbackAttempt) => string; soonestCooldownExpiry?: number | null; attribution?: FailoverAttribution; + cfg?: OpenClawConfig; + agentDir?: string; }): never { if (params.attempts.length <= 1 && params.lastError) { throw params.lastError; } + + if (params.attribution?.sessionId) { + void suspendSession({ + cfg: params.cfg, + agentDir: params.agentDir, + sessionId: params.attribution.sessionId, + laneId: params.attribution.lane, + reason: "circuit_open", + failedProvider: params.attempts[params.attempts.length - 1]?.provider ?? "unknown", + failedModel: params.attempts[params.attempts.length - 1]?.model ?? "unknown", + }); + } + const summary = params.attempts.length > 0 ? params.attempts.map(params.formatAttempt).join(" | ") : "unknown"; throw new FallbackSummaryError( @@ -529,6 +546,7 @@ export const __testing = { resolveFallbackCandidates, resolveImageFallbackCandidates, resolveCooldownDecision, + resolveSessionSuspensionReason, } as const; function resolveFallbackCandidates(params: { @@ -725,6 +743,11 @@ type CooldownDecision = type: "attempt"; reason: FailoverReason; markProbe: boolean; + } + | { + type: "suspend_lanes"; + reason: FailoverReason; + leaderCandidate?: ModelCandidate; }; function resolveCooldownDecision(params: { @@ -777,9 +800,9 @@ function resolveCooldownDecision(params: { return { type: "attempt", reason: inferredReason, markProbe: true }; } return { - type: "skip", + type: "suspend_lanes", reason: inferredReason, - error: `Provider ${params.candidate.provider} has ${inferredReason} issue (skipping all models)`, + leaderCandidate: params.candidate, }; } @@ -788,9 +811,9 @@ function resolveCooldownDecision(params: { (!params.isPrimary && shouldUseTransientCooldownProbeSlot(inferredReason)); if (!shouldAttemptDespiteCooldown) { return { - type: "skip", + type: "suspend_lanes", reason: inferredReason, - error: `Provider ${params.candidate.provider} is in cooldown (all profiles unavailable)`, + leaderCandidate: params.candidate, }; } @@ -897,6 +920,56 @@ export async function runWithModelFallback(params: { profileIds, }); + if (decision.type === "suspend_lanes") { + const error = `Provider ${candidate.provider} is in cooldown (suspending lanes)`; + attempts.push({ + provider: candidate.provider, + model: candidate.model, + error, + reason: decision.reason, + }); + + if (params.sessionId) { + emitFailoverEvent({ + sessionId: params.sessionId, + lane: params.lane, + fromProvider: candidate.provider, + fromModel: candidate.model, + reason: decision.reason, + suspended: true, + }); + void suspendSession({ + cfg: params.cfg, + agentDir: params.agentDir, + sessionId: params.sessionId, + laneId: params.lane, + reason: resolveSessionSuspensionReason(decision.reason), + failedProvider: candidate.provider, + failedModel: candidate.model, + }); + } + + await observeDecision({ + decision: "skip_candidate", + runId: params.runId, + sessionId: params.sessionId, + lane: params.lane, + requestedProvider: params.provider, + requestedModel: params.model, + candidate, + attempt: i + 1, + total: candidates.length, + reason: decision.reason, + error, + nextCandidate: candidates[i + 1], + isPrimary, + requestedModelMatched: requestedModel, + fallbackConfigured: hasFallbackCandidates, + profileCount: profileIds.length, + }); + continue; + } + if (decision.type === "skip") { attempts.push({ provider: candidate.provider, @@ -1145,6 +1218,8 @@ export async function runWithModelFallback(params: { candidates, }), attribution: { sessionId: params.sessionId, lane: params.lane }, + cfg: params.cfg, + agentDir: params.agentDir, }); } @@ -1204,5 +1279,6 @@ export async function runWithImageModelFallback(params: { lastError, label: "image models", formatAttempt: (attempt) => `${attempt.provider}/${attempt.model}: ${attempt.error}`, + cfg: params.cfg, }); } diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index c6a573b3af3..b27bc7f6bd9 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -82,6 +82,7 @@ import { runAgentCleanupStep } from "../run-cleanup-timeout.js"; import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js"; import { buildAgentRuntimePlan } from "../runtime-plan/build.js"; import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; +import { resolveSessionSuspensionReason, suspendSession } from "../session-suspension.js"; import { resolveToolLoopDetectionConfig } from "../tool-loop-detection-config.js"; import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js"; import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js"; @@ -1878,6 +1879,17 @@ export async function runEmbeddedPiAgent( const promptErrorDetails = normalizedPromptFailover ? describeFailoverError(normalizedPromptFailover) : describeFailoverError(promptError); + if (normalizedPromptFailover?.suspend) { + void suspendSession({ + cfg: params.config, + agentDir, + sessionId: activeSessionId ?? params.sessionId, + laneId: globalLane, + reason: resolveSessionSuspensionReason(normalizedPromptFailover.reason), + failedProvider: normalizedPromptFailover.provider ?? provider, + failedModel: normalizedPromptFailover.model ?? modelId, + }); + } const errorText = promptErrorDetails.message || formatErrorMessage(promptError); if (await maybeRefreshRuntimeAuthForAuthError(errorText, runtimeAuthRetry)) { authRetryPending = true; @@ -2248,6 +2260,17 @@ export async function runEmbeddedPiAgent( ? { status: assistantFailoverOutcome.error.status } : {}), }); + if (assistantFailoverOutcome.error.suspend) { + void suspendSession({ + cfg: params.config, + agentDir, + sessionId: activeSessionId ?? params.sessionId, + laneId: globalLane, + reason: resolveSessionSuspensionReason(assistantFailoverOutcome.error.reason), + failedProvider: assistantFailoverOutcome.error.provider ?? provider, + failedModel: assistantFailoverOutcome.error.model ?? modelId, + }); + } throw assistantFailoverOutcome.error; } const usageMeta = buildUsageAgentMetaFields({ diff --git a/src/agents/pi-embedded-runner/run/assistant-failover.ts b/src/agents/pi-embedded-runner/run/assistant-failover.ts index be37dde285b..a200a86e434 100644 --- a/src/agents/pi-embedded-runner/run/assistant-failover.ts +++ b/src/agents/pi-embedded-runner/run/assistant-failover.ts @@ -189,6 +189,10 @@ export async function handleAssistantFailover(params: { const status = resolveFailoverStatus(decision.reason) ?? (isTimeoutErrorMessage(message) ? 408 : undefined); params.logAssistantFailoverDecision("fallback_model", { status }); + const shouldSuspend = + Boolean(params.sessionKey) && + (decision.reason === "rate_limit" || decision.reason === "billing"); + return { action: "throw", overloadProfileRotations, @@ -199,6 +203,7 @@ export async function handleAssistantFailover(params: { profileId: params.lastProfileId, status, rawError: params.lastAssistant?.errorMessage?.trim(), + suspend: shouldSuspend, }), }; } @@ -230,6 +235,9 @@ export async function handleAssistantFailover(params: { const reason = resolveSurfaceErrorReason(decision.reason, params); const status = resolveFailoverStatus(reason) ?? (isTimeoutErrorMessage(message) ? 408 : undefined); + const shouldSuspend = + Boolean(params.sessionKey) && (reason === "rate_limit" || reason === "billing"); + return { action: "throw", overloadProfileRotations, @@ -240,6 +248,7 @@ export async function handleAssistantFailover(params: { profileId: params.lastProfileId, status, rawError: params.lastAssistant?.errorMessage?.trim(), + suspend: shouldSuspend, }), }; } diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 8192bcfa255..1a725a44e10 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -8,8 +8,15 @@ import { SessionManager, } from "@mariozechner/pi-coding-agent"; import { isAcpRuntimeSpawnAvailable } from "../../../acp/runtime/availability.js"; +import { buildHierarchyReinforcementMessage } from "../../../auto-reply/handoff-summarizer.js"; import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js"; import { getRuntimeConfig } from "../../../config/config.js"; +import { resolveStorePath } from "../../../config/sessions/paths.js"; +import { + loadSessionStore, + runQuotaSuspensionMaintenance, + updateSessionStoreEntry, +} from "../../../config/sessions/store.js"; import type { AssembleResult } from "../../../context-engine/types.js"; import { emitTrustedDiagnosticEvent } from "../../../infra/diagnostic-events.js"; import { @@ -2218,6 +2225,43 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, policy: transcriptPolicy, }); + + if (params.sessionKey && !isRawModelRun) { + const storePath = resolveStorePath(params.config?.session?.store, { + agentId: sessionAgentId, + }); + await runQuotaSuspensionMaintenance({ storePath }); + const store = loadSessionStore(storePath, { skipCache: true }); + const sessionEntry = store[params.sessionKey]; + const suspension = sessionEntry?.quotaSuspension; + if (suspension?.state === "resuming") { + const subagents = Object.values(store) + .filter((s) => s.spawnedBy === sessionEntry.sessionId) + .map((s) => ({ + sessionId: s.sessionId, + role: s.subagentRole, + lastStatus: s.status, + })); + const handoffMsg = buildHierarchyReinforcementMessage({ + summary: suspension.summary ?? "No recovery briefing was captured.", + activeSubagents: subagents, + }); + validated.push(handoffMsg); + await updateSessionStoreEntry({ + storePath, + sessionKey: params.sessionKey, + update: async (entry) => { + if (entry.quotaSuspension?.state !== "resuming") { + return null; + } + return { + quotaSuspension: { ...entry.quotaSuspension, state: "active" }, + }; + }, + }); + } + } + const heartbeatSummary = params.config && sessionAgentId ? resolveHeartbeatSummaryForAgent(params.config, sessionAgentId) diff --git a/src/agents/pi-embedded-subscribe.tools.ts b/src/agents/pi-embedded-subscribe.tools.ts index 9e35cf09b15..1fdca06f755 100644 --- a/src/agents/pi-embedded-subscribe.tools.ts +++ b/src/agents/pi-embedded-subscribe.tools.ts @@ -321,10 +321,12 @@ export function filterToolResultMediaUrls( // registered tool's media trust. TTS-generated local files carry a // separate trusted-media flag from the owned tool result, so they can // survive runs whose exact built-in set omitted the raw tts name. - if (builtinToolNames !== undefined && !trustedOwnedTtsLocalMedia) { - const registeredName = toolName?.trim(); - if (!registeredName || !builtinToolNames.has(registeredName)) { - return mediaUrls.filter((url) => HTTP_URL_RE.test(url.trim())); + if (builtinToolNames !== undefined) { + if (!trustedOwnedTtsLocalMedia) { + const registeredName = toolName?.trim(); + if (!registeredName || !builtinToolNames.has(registeredName)) { + return mediaUrls.filter((url) => HTTP_URL_RE.test(url.trim())); + } } } return mediaUrls; diff --git a/src/agents/session-suspension.test.ts b/src/agents/session-suspension.test.ts new file mode 100644 index 00000000000..376484bd2da --- /dev/null +++ b/src/agents/session-suspension.test.ts @@ -0,0 +1,75 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { CommandLane } from "../process/lanes.js"; + +const sessionStoreMocks = vi.hoisted(() => ({ + updateSessionStoreEntry: vi.fn(async (params: { update: (entry: unknown) => unknown }) => { + await params.update({ sessionId: "session-1" }); + }), +})); + +const commandQueueMocks = vi.hoisted(() => ({ + setCommandLaneConcurrency: vi.fn(), +})); + +vi.mock("../config/sessions.js", () => sessionStoreMocks); + +vi.mock("../process/command-queue.js", () => commandQueueMocks); + +vi.mock("./command/session.js", () => ({ + resolveStoredSessionKeyForSessionId: () => ({ + sessionKey: "session-key", + storePath: "/tmp/openclaw-session-suspension-test/sessions.json", + }), +})); + +async function suspendMainLane(ttlMs: number, cfg: OpenClawConfig) { + const { suspendSession } = await import("./session-suspension.js"); + await suspendSession({ + cfg, + sessionId: "session-1", + laneId: CommandLane.Main, + reason: "quota_exhausted", + failedProvider: "anthropic", + failedModel: "claude-opus-4-6", + ttlMs, + }); +} + +describe("session suspension", () => { + afterEach(async () => { + const { cancelLaneAutoResume } = await import("./session-suspension.js"); + cancelLaneAutoResume(CommandLane.Main); + vi.useRealTimers(); + sessionStoreMocks.updateSessionStoreEntry.mockClear(); + commandQueueMocks.setCommandLaneConcurrency.mockClear(); + }); + + it("auto-resumes main lane to configured agent concurrency", async () => { + vi.useFakeTimers(); + const cfg = { + agents: { defaults: { maxConcurrent: 4 } }, + } as OpenClawConfig; + + await suspendMainLane(100, cfg); + + expect(commandQueueMocks.setCommandLaneConcurrency).toHaveBeenCalledWith(CommandLane.Main, 0); + + await vi.advanceTimersByTimeAsync(100); + + expect(commandQueueMocks.setCommandLaneConcurrency).toHaveBeenLastCalledWith( + CommandLane.Main, + 4, + ); + }); + + it("maps failover reasons to persisted suspension reasons", async () => { + const { __testing } = await import("./session-suspension.js"); + + expect(__testing.resolveSessionSuspensionReason("rate_limit")).toBe("quota_exhausted"); + expect(__testing.resolveSessionSuspensionReason("billing")).toBe("manual"); + expect(__testing.resolveSessionSuspensionReason("overloaded")).toBe("circuit_open"); + expect(__testing.resolveSessionSuspensionReason("timeout")).toBe("circuit_open"); + expect(__testing.resolveSessionSuspensionReason("auth")).toBe("circuit_open"); + }); +}); diff --git a/src/agents/session-suspension.ts b/src/agents/session-suspension.ts new file mode 100644 index 00000000000..f136bdbf16a --- /dev/null +++ b/src/agents/session-suspension.ts @@ -0,0 +1,141 @@ +import path from "node:path"; +import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; +import { updateSessionStoreEntry } from "../config/sessions.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { setCommandLaneConcurrency } from "../process/command-queue.js"; +import { resolveStoredSessionKeyForSessionId } from "./command/session.js"; +import type { FailoverReason } from "./pi-embedded-helpers/types.js"; + +const log = createSubsystemLogger("session-suspension"); + +const DEFAULT_CUSTOM_LANE_RESUME_CONCURRENCY = 1; +export const DEFAULT_QUOTA_SUSPENSION_RESUME_MS = 30 * 60 * 1000; // 30 min + +const laneResumeTimers = new Map>(); + +export type SessionSuspensionReason = "quota_exhausted" | "manual" | "circuit_open"; + +function resolveLaneResumeConcurrency(cfg: OpenClawConfig | undefined, laneId: string): number { + switch (laneId) { + case "main": + return resolveAgentMaxConcurrent(cfg); + case "subagent": + return resolveSubagentMaxConcurrent(cfg); + case "cron": + case "cron-nested": { + const raw = cfg?.cron?.maxConcurrentRuns; + return typeof raw === "number" && Number.isFinite(raw) ? Math.max(1, Math.floor(raw)) : 1; + } + default: + return DEFAULT_CUSTOM_LANE_RESUME_CONCURRENCY; + } +} + +export function resolveSessionSuspensionReason(reason: FailoverReason): SessionSuspensionReason { + if (reason === "billing") { + return "manual"; + } + if (reason === "rate_limit") { + return "quota_exhausted"; + } + return "circuit_open"; +} + +function scheduleLaneAutoResume(laneId: string, delayMs: number, resumeConcurrency: number) { + const existing = laneResumeTimers.get(laneId); + if (existing) { + clearTimeout(existing); + } + const timer = setTimeout(() => { + laneResumeTimers.delete(laneId); + setCommandLaneConcurrency(laneId, resumeConcurrency); + log.info("auto-resumed lane after suspension TTL", { + laneId, + delayMs, + resumeConcurrency, + }); + }, delayMs); + if (typeof timer.unref === "function") { + timer.unref(); + } + laneResumeTimers.set(laneId, timer); +} + +export function cancelLaneAutoResume(laneId: string) { + const existing = laneResumeTimers.get(laneId); + if (existing) { + clearTimeout(existing); + laneResumeTimers.delete(laneId); + } +} + +export async function suspendSession(params: { + cfg: OpenClawConfig | undefined; + agentDir?: string; + sessionId: string; + laneId?: string; + reason: SessionSuspensionReason; + failedProvider: string; + failedModel: string; + summary?: string; + ttlMs?: number; +}) { + if (!params.cfg) { + return; + } + + const { sessionKey, storePath } = resolveStoredSessionKeyForSessionId({ + cfg: params.cfg, + sessionId: params.sessionId, + agentId: params.agentDir ? path.basename(params.agentDir) : undefined, + }); + + if (!sessionKey) { + return; + } + + const ttlMs = params.ttlMs ?? DEFAULT_QUOTA_SUSPENSION_RESUME_MS; + const now = Date.now(); + + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async () => ({ + quotaSuspension: { + schemaVersion: 1, + suspendedAt: now, + reason: params.reason, + failedProvider: params.failedProvider, + failedModel: params.failedModel, + summary: params.summary, + laneId: params.laneId, + expectedResumeBy: now + ttlMs, + state: "suspended", + }, + }), + }); + } catch (err) { + log.warn("failed to persist quota suspension; not throttling lane", { + sessionId: params.sessionId, + laneId: params.laneId, + error: err instanceof Error ? err.message : String(err), + }); + return; + } + + if (params.laneId) { + setCommandLaneConcurrency(params.laneId, 0); + scheduleLaneAutoResume( + params.laneId, + ttlMs, + resolveLaneResumeConcurrency(params.cfg, params.laneId), + ); + } +} + +export const __testing = { + resolveLaneResumeConcurrency, + resolveSessionSuspensionReason, +} as const; diff --git a/src/auto-reply/handoff-summarizer.ts b/src/auto-reply/handoff-summarizer.ts new file mode 100644 index 00000000000..0a063e2ca04 --- /dev/null +++ b/src/auto-reply/handoff-summarizer.ts @@ -0,0 +1,43 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +export interface HandoffSnapshot { + summary: string; + activeSubagents: Array<{ + sessionId: string; + role?: string; + lastStatus?: string; + }>; +} + +/** + * Builds the recovery briefing injected as the first user-side turn after a + * model failover. The user role is used (not assistant) so the new model + * treats the content as input rather than its own prior output. + */ +export function buildHierarchyReinforcementMessage(snapshot: HandoffSnapshot): AgentMessage { + const subagentReport = snapshot.activeSubagents + .map((s) => `- Subagent ${s.sessionId} (${s.role ?? "leaf"}): ${s.lastStatus ?? "running"}`) + .join("\n"); + + const content = [ + "[SYSTEM HANDOFF] The previous model is no longer active and a fallback model is now active.", + "You are the new LEADER (Orchestrator). Do not perform tasks already delegated to subordinates.", + "", + "ACTIVE SUBORDINATE UNITS:", + subagentReport || "None active.", + "", + "CURRENT STATE SUMMARY:", + snapshot.summary, + "", + "INSTRUCTIONS:", + "1. Review the state and subordinate reports.", + "2. Provide strategic guidance and commands to subordinates.", + "3. Do not repeat work already performed by subordinates.", + ].join("\n"); + + return { + role: "user", + content, + timestamp: Date.now(), + }; +} diff --git a/src/config/sessions/store-load.ts b/src/config/sessions/store-load.ts index 131f8b74331..05d8a6aa9dc 100644 --- a/src/config/sessions/store-load.ts +++ b/src/config/sessions/store-load.ts @@ -153,27 +153,29 @@ export function loadSessionStore( if (opts.runMaintenance) { const maintenance = opts.maintenanceConfig ?? resolveMaintenanceConfig(); const beforeCount = Object.keys(store).length; + let pruned = 0; + let capped = 0; if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) { - const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false }); + pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false }); const countAfterPrune = Object.keys(store).length; - const capped = shouldRunSessionEntryMaintenance({ + capped = shouldRunSessionEntryMaintenance({ entryCount: countAfterPrune, maxEntries: maintenance.maxEntries, }) ? capEntryCount(store, maintenance.maxEntries, { log: false }) : 0; - const afterCount = Object.keys(store).length; - if (pruned > 0 || capped > 0) { - serializedFromDisk = undefined; - log.info("applied load-time maintenance to oversized session store", { - storePath, - before: beforeCount, - after: afterCount, - pruned, - capped, - maxEntries: maintenance.maxEntries, - }); - } + } + const afterCount = Object.keys(store).length; + if (pruned > 0 || capped > 0) { + serializedFromDisk = undefined; + log.info("applied load-time maintenance to session store", { + storePath, + before: beforeCount, + after: afterCount, + pruned, + capped, + maxEntries: maintenance.maxEntries, + }); } } diff --git a/src/config/sessions/store-maintenance.ts b/src/config/sessions/store-maintenance.ts index 39f6526da67..70aca7138a0 100644 --- a/src/config/sessions/store-maintenance.ts +++ b/src/config/sessions/store-maintenance.ts @@ -201,6 +201,63 @@ export function pruneStaleEntries( return pruned; } +export const DEFAULT_QUOTA_SUSPENSION_TTL_MS = 30 * 60 * 1000; // 30 minutes +const QUOTA_SUSPENSION_CLEANUP_FACTOR = 2; // entries beyond N*ttl are deleted outright + +export interface QuotaSuspensionMaintenanceResult { + /** Suspensions whose state was advanced from "suspended" to "resuming" so the next attempt injects a handoff. */ + resumed: Array<{ sessionKey: string; laneId?: string }>; + /** Entries whose `quotaSuspension` field was removed entirely (already-resumed records past 2x TTL). */ + cleared: number; +} + +/** + * Two-stage TTL maintenance for `quotaSuspension` records: + * 1. After `ttlMs`, transition `state: "suspended" → "resuming"` so the next + * attempt for that session sees the resume marker and injects a handoff. + * 2. After `2 * ttlMs`, drop the field entirely (the record has done its job). + * + * Mutates `store` in-place. The caller is responsible for translating the + * returned `resumed[]` into in-process lane-concurrency restoration calls, + * which keeps this module free of `process/*` dependencies. + */ +export function pruneQuotaSuspensions(params: { + store: Record; + now: number; + ttlMs?: number; + log?: boolean; +}): QuotaSuspensionMaintenanceResult { + const ttlMs = params.ttlMs ?? DEFAULT_QUOTA_SUSPENSION_TTL_MS; + const cleanupAfterResumeMs = ttlMs * (QUOTA_SUSPENSION_CLEANUP_FACTOR - 1); + const resumed: Array<{ sessionKey: string; laneId?: string }> = []; + let cleared = 0; + for (const [sessionKey, entry] of Object.entries(params.store)) { + const suspension = entry.quotaSuspension; + if (!suspension) { + continue; + } + const resumeAtMs = suspension.expectedResumeBy ?? suspension.suspendedAt + ttlMs; + const cleanupAtMs = resumeAtMs + cleanupAfterResumeMs; + if (params.now >= cleanupAtMs) { + delete entry.quotaSuspension; + cleared++; + continue; + } + if (suspension.state === "suspended" && params.now >= resumeAtMs) { + entry.quotaSuspension = { ...suspension, state: "resuming" }; + resumed.push({ sessionKey, laneId: suspension.laneId }); + } + } + if ((resumed.length > 0 || cleared > 0) && params.log !== false) { + log.info("processed quota-suspension TTLs", { + resumed: resumed.length, + cleared, + ttlMs, + }); + } + return { resumed, cleared }; +} + function getEntryUpdatedAt(entry?: SessionEntry): number { return entry?.updatedAt ?? Number.NEGATIVE_INFINITY; } diff --git a/src/config/sessions/store.pruning.integration.test.ts b/src/config/sessions/store.pruning.integration.test.ts index 3f42e4dddeb..f9979514387 100644 --- a/src/config/sessions/store.pruning.integration.test.ts +++ b/src/config/sessions/store.pruning.integration.test.ts @@ -20,6 +20,7 @@ import { runSessionsCleanup } from "./cleanup-service.js"; import { clearSessionStoreCacheForTest, loadSessionStore, + runQuotaSuspensionMaintenance, saveSessionStore, updateSessionStore, } from "./store.js"; @@ -716,6 +717,57 @@ describe("Integration: saveSessionStore with pruning", () => { expect(loaded["session-74"]).toBeUndefined(); }); + it("persists quota suspension TTL transitions through writer maintenance", async () => { + const now = Date.now(); + const store: Record = { + suspended: { + ...makeEntry(now), + quotaSuspension: { + schemaVersion: 1, + suspendedAt: now - 30_000, + expectedResumeBy: now - 1, + state: "suspended", + reason: "quota_exhausted", + failedProvider: "anthropic", + failedModel: "claude-opus-4-6", + laneId: "main", + }, + }, + active: { + ...makeEntry(now), + quotaSuspension: { + schemaVersion: 1, + suspendedAt: now - 61_000, + expectedResumeBy: now - 31_000, + state: "active", + reason: "circuit_open", + failedProvider: "anthropic", + failedModel: "claude-opus-4-6", + laneId: "main", + }, + }, + }; + await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); + + const result = await runQuotaSuspensionMaintenance({ + storePath, + now, + ttlMs: 30_000, + log: false, + }); + + expect(result).toEqual({ resumed: [{ sessionKey: "suspended", laneId: "main" }], cleared: 1 }); + const loaded = loadSessionStore(storePath, { skipCache: true }); + expect(loaded.suspended?.quotaSuspension?.state).toBe("resuming"); + expect(loaded.active?.quotaSuspension).toBeUndefined(); + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + SessionEntry + >; + expect(persisted.suspended?.quotaSuspension?.state).toBe("resuming"); + expect(persisted.active?.quotaSuspension).toBeUndefined(); + }); + it("updateSessionStore batches cap-hit maintenance instead of pruning every new session", async () => { const now = Date.now(); const store = Object.fromEntries( diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 1d3d5ec7a14..18af31e2c3d 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -27,8 +27,10 @@ import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { capEntryCount, getActiveSessionMaintenanceWarning, + pruneQuotaSuspensions, pruneStaleEntries, shouldRunSessionEntryMaintenance, + type QuotaSuspensionMaintenanceResult, type ResolvedSessionMaintenanceConfig, type SessionMaintenanceWarning, } from "./store-maintenance.js"; @@ -451,6 +453,28 @@ export async function updateSessionStore( }); } +export async function runQuotaSuspensionMaintenance(params: { + storePath: string; + now?: number; + ttlMs?: number; + log?: boolean; +}): Promise { + if (!fs.existsSync(params.storePath)) { + return { resumed: [], cleared: 0 }; + } + return await updateSessionStore( + params.storePath, + (store) => + pruneQuotaSuspensions({ + store, + now: params.now ?? Date.now(), + ttlMs: params.ttlMs, + log: params.log, + }), + { skipMaintenance: true }, + ); +} + function getErrorCode(error: unknown): string | null { if (!error || typeof error !== "object" || !("code" in error)) { return null; diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index c257cc40c96..e9b0a16e8eb 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -147,6 +147,30 @@ export type SubagentRecoveryState = { wedgedReason?: string; }; +export type LaneExecutionState = + | "active" + | "draining" + | "suspended" + | "resuming" + | "circuit_open" + | "failed_handoff"; + +export interface QuotaSuspension { + schemaVersion: 1; + suspendedAt: number; // epoch ms + reason: "quota_exhausted" | "manual" | "circuit_open"; + failedProvider: string; + failedModel: string; + /** Recovery briefing text injected into the next attempt when state === "resuming". */ + summary?: string; + /** Opaque pointer to an external snapshot blob (path/key); not the briefing text itself. */ + snapshotRef?: string; + /** Lane that was set to concurrency=0 when this suspension was issued. */ + laneId?: string; + expectedResumeBy?: number; // Reaper TTL (e.g. 30min) + state: LaneExecutionState; // State machine check for hot-path +} + export type SessionEntry = { /** * Last delivered heartbeat payload (used to suppress duplicate heartbeat notifications). @@ -192,6 +216,8 @@ export type SessionEntry = { abortedLastRun?: boolean; /** Durable guard state for automatic subagent orphan recovery. */ subagentRecovery?: SubagentRecoveryState; + /** Quota cascade protection and state-aware failover status. */ + quotaSuspension?: QuotaSuspension; /** Timestamp (ms) when the current sessionId first became active. */ sessionStartedAt?: number; /** Timestamp (ms) of the last user/channel interaction that should extend idle lifetime. */ diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index b809b1e6bbd..56dc33e2c1a 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -46,6 +46,20 @@ export type DiagnosticUsageEvent = DiagnosticBaseEvent & { durationMs?: number; }; +export type DiagnosticFailoverEvent = DiagnosticBaseEvent & { + type: "model.failover"; + sessionId?: string; + sessionKey?: string; + lane?: string; + fromProvider?: string; + fromModel?: string; + toProvider?: string; + toModel?: string; + reason: string; + cascadeDepth?: number; + suspended?: boolean; +}; + export type DiagnosticWebhookReceivedEvent = DiagnosticBaseEvent & { type: "webhook.received"; channel: string; @@ -598,7 +612,8 @@ export type DiagnosticEventPayload = | DiagnosticMemoryPressureEvent | DiagnosticPayloadLargeEvent | DiagnosticLogRecordEvent - | DiagnosticTelemetryExporterEvent; + | DiagnosticTelemetryExporterEvent + | DiagnosticFailoverEvent; export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event ? Event extends DiagnosticEventPayload @@ -845,6 +860,13 @@ export function emitTrustedDiagnosticEvent(event: DiagnosticEventInput) { emitDiagnosticEventWithTrust(event, true); } +export function emitFailoverEvent(event: Omit) { + emitTrustedDiagnosticEvent({ + type: "model.failover", + ...event, + }); +} + export function onInternalDiagnosticEvent(listener: DiagnosticEventListener): () => void { const state = getDiagnosticEventsState(); state.listeners.add(listener); diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index 796ae68a4e7..87df183d516 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -471,6 +471,11 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.outcome = event.status; assignReasonCode(record, event.reason ?? event.errorCategory); break; + case "model.failover": + record.provider = event.fromProvider; + record.model = event.fromModel; + assignReasonCode(record, event.reason); + break; } return record; diff --git a/src/plugins/session-entry-slot-keys.ts b/src/plugins/session-entry-slot-keys.ts index 57c37e703fc..90cec341cd7 100644 --- a/src/plugins/session-entry-slot-keys.ts +++ b/src/plugins/session-entry-slot-keys.ts @@ -110,6 +110,7 @@ const SESSION_ENTRY_RESERVED_SLOT_KEY_LIST = [ "systemPromptReport", "pluginDebugEntries", "acp", + "quotaSuspension", ] as const satisfies ReadonlyArray; type ReservedSessionEntrySlotKey = Extract< diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index ca55e430c80..3115917119e 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -364,6 +364,35 @@ describe("command queue", () => { } }); + it("keeps work queued while a lane has zero concurrency and drains after resume", async () => { + const lane = `suspended-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 0); + + let ran = false; + const task = enqueueCommandInLane(lane, async () => { + ran = true; + return "resumed"; + }); + + await Promise.resolve(); + expect(ran).toBe(false); + expect(getCommandLaneSnapshot(lane)).toMatchObject({ + activeCount: 0, + queuedCount: 1, + maxConcurrent: 0, + }); + + setCommandLaneConcurrency(lane, 1); + + await expect(task).resolves.toBe("resumed"); + expect(ran).toBe(true); + expect(getCommandLaneSnapshot(lane)).toMatchObject({ + activeCount: 0, + queuedCount: 0, + maxConcurrent: 1, + }); + }); + it("getCommandLaneSnapshot reports active and queued work for one lane", async () => { const lane = `snapshot-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 1); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index da9ef7d2a0a..3ddaad72877 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -311,8 +311,12 @@ export function markGatewayDraining(): void { export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { const cleaned = normalizeLane(lane); const state = getLaneState(cleaned); - state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent)); - drainLane(cleaned); + const isProbeLane = cleaned.startsWith("auth-probe:") || cleaned.startsWith("session:probe-"); + const minConcurrent = isProbeLane ? 1 : 0; + state.maxConcurrent = Math.max(minConcurrent, Math.floor(maxConcurrent)); + if (state.maxConcurrent > 0) { + drainLane(cleaned); + } } export function enqueueCommandInLane(