diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d5f5485201..70c8186ea0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai - Control UI/sessions: bound the default Sessions tab query to recent activity and fewer rows, avoiding expensive full-history loads while keeping filters editable. Fixes #76050. (#76051) Thanks @Neomail2. - Gateway/channels: cap startup fanout at four channel/account handoffs and recover from Bonjour ciao self-probe races, reducing Windows startup stalls with many Telegram accounts. Fixes #75687. +- Gateway/sessions: keep `sessions.list` polling responsive on large session stores by reusing list-safe session cache/indexes and returning a lightweight compaction checkpoint preview instead of heavyweight summaries. Thanks @rolandrscheel. - CLI/update: treat inherited Gateway service markers as origin hints and only block package replacement when the managed Gateway is still live, so self-updates can stop the service and continue safely. (#75729) Thanks @hxy91819. - Agents/failover: exempt run-level timeouts that fire during tool execution from model fallback, timeout-triggered compaction, and generic timeout payload synthesis. Long `process(poll)`, browser, or `exec` tool calls that exceed `agents.defaults.timeoutSeconds` previously rotated auth profiles, switched to a fallback model, and surfaced a misleading "LLM request timed out" error even though the primary model had already responded. Mirrors the existing `timedOutDuringCompaction` precedent (#46889). Fixes #52147. (#75873) Thanks @simonusa. diff --git a/src/agents/subagent-registry-queries.ts b/src/agents/subagent-registry-queries.ts index 33ac1bceff7..093399c8575 100644 --- a/src/agents/subagent-registry-queries.ts +++ b/src/agents/subagent-registry-queries.ts @@ -50,6 +50,191 @@ export function listRunsForControllerFromRuns( return [...runs.values()].filter((entry) => resolveControllerSessionKey(entry) === key); } +type LatestRunPair = { + runId: string; + entry: SubagentRunRecord; +}; + +export type SubagentRunReadIndex = { + getDisplaySubagentRun(childSessionKey: string): SubagentRunRecord | null; + countActiveDescendantRuns(rootSessionKey: string): number; + runsByControllerSessionKey: ReadonlyMap; +}; + +function rememberLatestRunEntry( + map: Map, + key: string, + entry: SubagentRunRecord, +): void { + const existing = map.get(key); + if (!existing || entry.createdAt > existing.createdAt) { + map.set(key, entry); + } +} + +function rememberLatestRunPair( + map: Map, + key: string, + runId: string, + entry: SubagentRunRecord, +): void { + const existing = map.get(key); + if (!existing || entry.createdAt > existing.entry.createdAt) { + map.set(key, { runId, entry }); + } +} + +export function buildSubagentRunReadIndexFromRuns(params: { + runs: Map; + inMemoryRuns?: Iterable; + now?: number; +}): SubagentRunReadIndex { + const { runs } = params; + const now = params.now ?? Date.now(); + const inMemoryDisplayByChildSessionKey = new Map< + string, + { + latestInMemoryActive: SubagentRunRecord | null; + latestInMemoryEnded: SubagentRunRecord | null; + } + >(); + const latestSnapshotActiveByChildSessionKey = new Map(); + const latestSnapshotEndedByChildSessionKey = new Map(); + const latestRunByChildSessionKey = new Map(); + const runsByControllerSessionKey = new Map(); + const latestRunByRequesterAndChildSessionKey = new Map>(); + const activeDescendantCountBySessionKey = new Map(); + + for (const entry of params.inMemoryRuns ?? []) { + const childSessionKey = entry.childSessionKey.trim(); + if (!childSessionKey) { + continue; + } + let display = inMemoryDisplayByChildSessionKey.get(childSessionKey); + if (!display) { + display = { latestInMemoryActive: null, latestInMemoryEnded: null }; + inMemoryDisplayByChildSessionKey.set(childSessionKey, display); + } + if (hasSubagentRunEnded(entry)) { + if (!display.latestInMemoryEnded || entry.createdAt > display.latestInMemoryEnded.createdAt) { + display.latestInMemoryEnded = entry; + } + continue; + } + if (!display.latestInMemoryActive || entry.createdAt > display.latestInMemoryActive.createdAt) { + display.latestInMemoryActive = entry; + } + } + + for (const [runId, entry] of runs.entries()) { + const childSessionKey = entry.childSessionKey.trim(); + const controllerSessionKey = resolveControllerSessionKey(entry); + if (controllerSessionKey) { + let controllerRuns = runsByControllerSessionKey.get(controllerSessionKey); + if (!controllerRuns) { + controllerRuns = []; + runsByControllerSessionKey.set(controllerSessionKey, controllerRuns); + } + controllerRuns.push(entry); + } + if (!childSessionKey) { + continue; + } + if (isLiveUnendedSubagentRun(entry, now)) { + rememberLatestRunEntry(latestSnapshotActiveByChildSessionKey, childSessionKey, entry); + } else { + rememberLatestRunEntry(latestSnapshotEndedByChildSessionKey, childSessionKey, entry); + } + rememberLatestRunPair(latestRunByChildSessionKey, childSessionKey, runId, entry); + + const requesterSessionKey = entry.requesterSessionKey; + if (!requesterSessionKey) { + continue; + } + let latestByChild = latestRunByRequesterAndChildSessionKey.get(requesterSessionKey); + if (!latestByChild) { + latestByChild = new Map(); + latestRunByRequesterAndChildSessionKey.set(requesterSessionKey, latestByChild); + } + rememberLatestRunPair(latestByChild, childSessionKey, runId, entry); + } + + const getDisplaySubagentRun = (childSessionKey: string): SubagentRunRecord | null => { + const key = childSessionKey.trim(); + if (!key) { + return null; + } + const inMemoryDisplay = inMemoryDisplayByChildSessionKey.get(key); + if (inMemoryDisplay) { + const latestInMemoryEnded = inMemoryDisplay.latestInMemoryEnded; + const latestInMemoryActive = inMemoryDisplay.latestInMemoryActive; + if (latestInMemoryEnded || latestInMemoryActive) { + if ( + latestInMemoryEnded && + (!latestInMemoryActive || latestInMemoryEnded.createdAt > latestInMemoryActive.createdAt) + ) { + return latestInMemoryEnded; + } + return latestInMemoryActive ?? latestInMemoryEnded; + } + } + return ( + latestSnapshotActiveByChildSessionKey.get(key) ?? + latestSnapshotEndedByChildSessionKey.get(key) ?? + null + ); + }; + + const countActiveDescendantRuns = (rootSessionKey: string): number => { + const root = rootSessionKey.trim(); + if (!root) { + return 0; + } + if (activeDescendantCountBySessionKey.has(root)) { + return activeDescendantCountBySessionKey.get(root) ?? 0; + } + let count = 0; + const pending = [root]; + const visited = new Set([root]); + for (let index = 0; index < pending.length; index += 1) { + const requester = pending[index]; + if (!requester) { + continue; + } + const latestByChild = latestRunByRequesterAndChildSessionKey.get(requester); + if (!latestByChild) { + continue; + } + for (const [childSessionKey, pair] of latestByChild.entries()) { + const latestForChildSession = latestRunByChildSessionKey.get(childSessionKey); + if ( + !latestForChildSession || + latestForChildSession.runId !== pair.runId || + latestForChildSession.entry.requesterSessionKey !== requester + ) { + continue; + } + if (isLiveUnendedSubagentRun(pair.entry, now)) { + count += 1; + } + if (!childSessionKey || visited.has(childSessionKey)) { + continue; + } + visited.add(childSessionKey); + pending.push(childSessionKey); + } + } + activeDescendantCountBySessionKey.set(root, count); + return count; + }; + + return { + getDisplaySubagentRun, + countActiveDescendantRuns, + runsByControllerSessionKey, + }; +} + function findLatestRunForChildSession( runs: Map, childSessionKey: string, diff --git a/src/agents/subagent-registry-read.ts b/src/agents/subagent-registry-read.ts index 5a839e3c034..85388a64e7e 100644 --- a/src/agents/subagent-registry-read.ts +++ b/src/agents/subagent-registry-read.ts @@ -1,10 +1,12 @@ import { getAgentRunContext } from "../infra/agent-events.js"; import { subagentRuns } from "./subagent-registry-memory.js"; import { + buildSubagentRunReadIndexFromRuns, countActiveDescendantRunsFromRuns, getSubagentRunByChildSessionKeyFromRuns, listDescendantRunsForRequesterFromRuns, listRunsForControllerFromRuns, + type SubagentRunReadIndex, } from "./subagent-registry-queries.js"; import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; @@ -20,6 +22,14 @@ export { resolveSubagentSessionStatus, } from "./subagent-session-metrics.js"; +export function buildSubagentRunReadIndex(now = Date.now()): SubagentRunReadIndex { + return buildSubagentRunReadIndexFromRuns({ + runs: getSubagentRunsSnapshotForRead(subagentRuns), + inMemoryRuns: subagentRuns.values(), + now, + }); +} + export function listSubagentRunsForController(controllerSessionKey: string): SubagentRunRecord[] { return listRunsForControllerFromRuns( getSubagentRunsSnapshotForRead(subagentRuns), diff --git a/src/gateway/server.sessions.compaction.test.ts b/src/gateway/server.sessions.compaction.test.ts index e8eb5bd4dd9..736410939a6 100644 --- a/src/gateway/server.sessions.compaction.test.ts +++ b/src/gateway/server.sessions.compaction.test.ts @@ -23,6 +23,7 @@ const { createSessionStoreDir, openClient } = setupGatewaySessionsTestHarness(); test("sessions.compaction.* lists checkpoints and branches or restores from pre-compaction snapshots", async () => { const { dir, storePath } = await createSessionStoreDir(); const fixture = await createCheckpointFixture(dir); + const checkpointCreatedAt = Date.now(); const { SessionManager } = await getSessionManagerModule(); await writeSessionStore({ entries: { @@ -33,7 +34,7 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre- checkpointId: "checkpoint-1", sessionKey: "agent:main:main", sessionId: fixture.sessionId, - createdAt: Date.now(), + createdAt: checkpointCreatedAt, reason: "manual", tokensBefore: 123, tokensAfter: 45, @@ -64,7 +65,9 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre- compactionCheckpointCount?: number; latestCompactionCheckpoint?: { checkpointId: string; + createdAt: number; reason: string; + summary?: string; tokensBefore?: number; tokensAfter?: number; }; @@ -75,8 +78,11 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre- (session) => session.key === "agent:main:main", ); expect(main?.compactionCheckpointCount).toBe(1); - expect(main?.latestCompactionCheckpoint?.checkpointId).toBe("checkpoint-1"); - expect(main?.latestCompactionCheckpoint?.reason).toBe("manual"); + expect(main?.latestCompactionCheckpoint).toEqual({ + checkpointId: "checkpoint-1", + createdAt: checkpointCreatedAt, + reason: "manual", + }); const listedCheckpoints = await rpcReq<{ ok: true; diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 7753e09303f..ba1b500d86e 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -27,6 +27,7 @@ import { resolveThinkingDefault, } from "../agents/model-selection.js"; import { + buildSubagentRunReadIndex, countActiveDescendantRuns, getSessionDisplaySubagentRunByChildSessionKey, getSubagentSessionRuntimeMs, @@ -267,6 +268,33 @@ function resolveLatestCompactionCheckpoint( ); } +function buildCompactionCheckpointPreview( + checkpoint: NonNullable[number] | undefined, +): GatewaySessionRow["latestCompactionCheckpoint"] { + if (!checkpoint) { + return undefined; + } + const checkpointId = normalizeOptionalString(checkpoint.checkpointId); + const createdAt = checkpoint.createdAt; + const reason = checkpoint.reason; + if (!checkpointId || typeof createdAt !== "number" || !Number.isFinite(createdAt)) { + return undefined; + } + if ( + reason !== "manual" && + reason !== "auto-threshold" && + reason !== "overflow-retry" && + reason !== "timeout-retry" + ) { + return undefined; + } + return { + checkpointId, + createdAt, + reason, + }; +} + function resolveEstimatedSessionCostUsd(params: { cfg: OpenClawConfig; provider?: string; @@ -341,17 +369,29 @@ function shouldKeepStoreOnlyChildLink(entry: SessionEntry, now: number): boolean ); } +type SessionListRowContext = { + subagentRuns: ReturnType; + storeChildSessionsByKey: Map; +}; + function resolveRuntimeChildSessionKeys( controllerSessionKey: string, now = Date.now(), + subagentRuns?: SessionListRowContext["subagentRuns"], ): string[] | undefined { const childSessionKeys = new Set(); - for (const entry of listSubagentRunsForController(controllerSessionKey)) { + const controllerKey = controllerSessionKey.trim(); + const runs = + subagentRuns?.runsByControllerSessionKey.get(controllerKey) ?? + listSubagentRunsForController(controllerSessionKey); + for (const entry of runs) { const childSessionKey = normalizeOptionalString(entry.childSessionKey); if (!childSessionKey) { continue; } - const latest = getSessionDisplaySubagentRunByChildSessionKey(childSessionKey); + const latest = subagentRuns + ? subagentRuns.getDisplaySubagentRun(childSessionKey) + : getSessionDisplaySubagentRunByChildSessionKey(childSessionKey); if (!latest) { continue; } @@ -363,7 +403,9 @@ function resolveRuntimeChildSessionKeys( } if ( !shouldKeepSubagentRunChildLink(latest, { - activeDescendants: countActiveDescendantRuns(childSessionKey), + activeDescendants: subagentRuns + ? subagentRuns.countActiveDescendantRuns(childSessionKey) + : countActiveDescendantRuns(childSessionKey), now, }) ) { @@ -393,6 +435,7 @@ function addChildSessionKey( function buildStoreChildSessionIndex( store: Record, now = Date.now(), + subagentRuns?: SessionListRowContext["subagentRuns"], ): Map { const childSessionsByKey = new Map(); for (const [key, entry] of Object.entries(store)) { @@ -406,7 +449,9 @@ function buildStoreChildSessionIndex( if (parentKeys.length === 0) { continue; } - const latest = getSessionDisplaySubagentRunByChildSessionKey(key); + const latest = subagentRuns + ? subagentRuns.getDisplaySubagentRun(key) + : getSessionDisplaySubagentRunByChildSessionKey(key); let latestControllerSessionKey: string | undefined; if (latest) { latestControllerSessionKey = @@ -414,7 +459,9 @@ function buildStoreChildSessionIndex( normalizeOptionalString(latest.requesterSessionKey); if ( !shouldKeepSubagentRunChildLink(latest, { - activeDescendants: countActiveDescendantRuns(key), + activeDescendants: subagentRuns + ? subagentRuns.countActiveDescendantRuns(key) + : countActiveDescendantRuns(key), now, }) ) { @@ -433,6 +480,17 @@ function buildStoreChildSessionIndex( return childSessionsByKey; } +function buildSessionListRowContext(params: { + store: Record; + now: number; +}): SessionListRowContext { + const subagentRuns = buildSubagentRunReadIndex(params.now); + return { + subagentRuns, + storeChildSessionsByKey: buildStoreChildSessionIndex(params.store, params.now, subagentRuns), + }; +} + function mergeChildSessionKeys( runtimeChildSessions: string[] | undefined, storeChildSessions: string[] | undefined, @@ -450,9 +508,16 @@ function resolveChildSessionKeys( controllerSessionKey: string, store: Record, now = Date.now(), + subagentRuns?: SessionListRowContext["subagentRuns"], ): string[] | undefined { - const runtimeChildSessions = resolveRuntimeChildSessionKeys(controllerSessionKey, now); - const storeChildSessions = buildStoreChildSessionIndex(store, now).get(controllerSessionKey); + const runtimeChildSessions = resolveRuntimeChildSessionKeys( + controllerSessionKey, + now, + subagentRuns, + ); + const storeChildSessions = buildStoreChildSessionIndex(store, now, subagentRuns).get( + controllerSessionKey, + ); return mergeChildSessionKeys(runtimeChildSessions, storeChildSessions); } @@ -1364,6 +1429,7 @@ export function buildGatewaySessionRow(params: { includeLastMessage?: boolean; transcriptUsageMaxBytes?: number; storeChildSessionsByKey?: Map; + rowContext?: SessionListRowContext; }): GatewaySessionRow { const { cfg, storePath, store, key, entry } = params; const now = params.now ?? Date.now(); @@ -1393,7 +1459,10 @@ export function buildGatewaySessionRow(params: { const deliveryFields = normalizeSessionDeliveryFields(entry); const parsedAgent = parseAgentSessionKey(key); const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg)); - const subagentRun = getSessionDisplaySubagentRunByChildSessionKey(key); + const rowContext = params.rowContext; + const subagentRun = rowContext + ? rowContext.subagentRuns.getDisplaySubagentRun(key) + : getSessionDisplaySubagentRunByChildSessionKey(key); const subagentOwner = normalizeOptionalString(subagentRun?.controllerSessionKey) || normalizeOptionalString(subagentRun?.requesterSessionKey); @@ -1501,11 +1570,13 @@ export function buildGatewaySessionRow(params: { : transcriptUsage?.totalTokensFresh === true; const childSessions = params.storeChildSessionsByKey ? mergeChildSessionKeys( - resolveRuntimeChildSessionKeys(key, now), + resolveRuntimeChildSessionKeys(key, now, rowContext?.subagentRuns), params.storeChildSessionsByKey.get(key), ) - : resolveChildSessionKeys(key, store, now); - const latestCompactionCheckpoint = resolveLatestCompactionCheckpoint(entry); + : resolveChildSessionKeys(key, store, now, rowContext?.subagentRuns); + const latestCompactionCheckpoint = buildCompactionCheckpointPreview( + resolveLatestCompactionCheckpoint(entry), + ); const agentRuntime = resolveAgentRuntimeMetadata(cfg, sessionAgentId); const selectedOrRuntimeModelProvider = selectedModel?.provider ?? modelProvider; const selectedOrRuntimeModel = selectedModel?.model ?? model; @@ -1800,7 +1871,7 @@ export function listSessionsFromStore(params: { const now = Date.now(); const sessionListTranscriptUsageMaxBytes = 64 * 1024; const sessionListTranscriptFieldRows = 100; - const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now); + const rowContext = buildSessionListRowContext({ store, now }); const includeDerivedTitles = opts.includeDerivedTitles === true; const includeLastMessage = opts.includeLastMessage === true; @@ -1819,7 +1890,8 @@ export function listSessionsFromStore(params: { includeDerivedTitles: includeTranscriptFields && includeDerivedTitles, includeLastMessage: includeTranscriptFields && includeLastMessage, transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes, - storeChildSessionsByKey, + storeChildSessionsByKey: rowContext.storeChildSessionsByKey, + rowContext, }); }); @@ -1853,7 +1925,7 @@ export async function listSessionsFromStoreAsync(params: { const now = Date.now(); const sessionListTranscriptUsageMaxBytes = 64 * 1024; const sessionListTranscriptFieldRows = 100; - const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now); + const rowContext = buildSessionListRowContext({ store, now }); const includeDerivedTitles = opts.includeDerivedTitles === true; const includeLastMessage = opts.includeLastMessage === true; @@ -1874,7 +1946,8 @@ export async function listSessionsFromStoreAsync(params: { includeDerivedTitles: false, includeLastMessage: false, transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes, - storeChildSessionsByKey, + storeChildSessionsByKey: rowContext.storeChildSessionsByKey, + rowContext, }); if ( entry?.sessionId && diff --git a/src/gateway/session-utils.types.ts b/src/gateway/session-utils.types.ts index f270ed2bb01..a36722af81d 100644 --- a/src/gateway/session-utils.types.ts +++ b/src/gateway/session-utils.types.ts @@ -27,6 +27,11 @@ export type SessionRunStatus = "running" | "done" | "failed" | "killed" | "timeo type SubagentRunState = "active" | "interrupted" | "historical"; +export type SessionCompactionCheckpointPreview = Pick< + SessionCompactionCheckpoint, + "checkpointId" | "createdAt" | "reason" +>; + export type GatewaySessionRow = { key: string; spawnedBy?: string; @@ -84,7 +89,7 @@ export type GatewaySessionRow = { lastAccountId?: string; lastThreadId?: SessionEntry["lastThreadId"]; compactionCheckpointCount?: number; - latestCompactionCheckpoint?: SessionCompactionCheckpoint; + latestCompactionCheckpoint?: SessionCompactionCheckpointPreview; pluginExtensions?: PluginSessionExtensionProjection[]; }; diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index 51f32e8cef1..63f9fa5702a 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -409,6 +409,11 @@ export type SessionCompactionCheckpoint = { postCompaction: SessionCompactionTranscriptReference; }; +export type SessionCompactionCheckpointPreview = Pick< + SessionCompactionCheckpoint, + "checkpointId" | "createdAt" | "reason" +>; + export type GatewaySessionRow = { key: string; spawnedBy?: string; @@ -447,7 +452,7 @@ export type GatewaySessionRow = { agentRuntime?: GatewayAgentRuntime; contextTokens?: number; compactionCheckpointCount?: number; - latestCompactionCheckpoint?: SessionCompactionCheckpoint; + latestCompactionCheckpoint?: SessionCompactionCheckpointPreview; }; export type SessionsListResult = SessionsListResultBase;