fix(sessions): keep list polling lightweight (#76090)

Co-authored-by: rolandrscheel <20336324+rolandrscheel@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-05-02 15:29:29 +01:00
parent d228b0dc58
commit 2b37b383ed
7 changed files with 305 additions and 20 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
- Control UI/sessions: bound the default Sessions tab query to recent activity and fewer rows, avoiding expensive full-history loads while keeping filters editable. Fixes #76050. (#76051) Thanks @Neomail2.
- Gateway/channels: cap startup fanout at four channel/account handoffs and recover from Bonjour ciao self-probe races, reducing Windows startup stalls with many Telegram accounts. Fixes #75687.
- Gateway/sessions: keep `sessions.list` polling responsive on large session stores by reusing list-safe session cache/indexes and returning a lightweight compaction checkpoint preview instead of heavyweight summaries. Thanks @rolandrscheel.
- CLI/update: treat inherited Gateway service markers as origin hints and only block package replacement when the managed Gateway is still live, so self-updates can stop the service and continue safely. (#75729) Thanks @hxy91819.
- Agents/failover: exempt run-level timeouts that fire during tool execution from model fallback, timeout-triggered compaction, and generic timeout payload synthesis. Long `process(poll)`, browser, or `exec` tool calls that exceed `agents.defaults.timeoutSeconds` previously rotated auth profiles, switched to a fallback model, and surfaced a misleading "LLM request timed out" error even though the primary model had already responded. Mirrors the existing `timedOutDuringCompaction` precedent (#46889). Fixes #52147. (#75873) Thanks @simonusa.

View File

@@ -50,6 +50,191 @@ export function listRunsForControllerFromRuns(
return [...runs.values()].filter((entry) => resolveControllerSessionKey(entry) === key);
}
type LatestRunPair = {
runId: string;
entry: SubagentRunRecord;
};
export type SubagentRunReadIndex = {
getDisplaySubagentRun(childSessionKey: string): SubagentRunRecord | null;
countActiveDescendantRuns(rootSessionKey: string): number;
runsByControllerSessionKey: ReadonlyMap<string, readonly SubagentRunRecord[]>;
};
function rememberLatestRunEntry(
map: Map<string, SubagentRunRecord>,
key: string,
entry: SubagentRunRecord,
): void {
const existing = map.get(key);
if (!existing || entry.createdAt > existing.createdAt) {
map.set(key, entry);
}
}
function rememberLatestRunPair(
map: Map<string, LatestRunPair>,
key: string,
runId: string,
entry: SubagentRunRecord,
): void {
const existing = map.get(key);
if (!existing || entry.createdAt > existing.entry.createdAt) {
map.set(key, { runId, entry });
}
}
export function buildSubagentRunReadIndexFromRuns(params: {
runs: Map<string, SubagentRunRecord>;
inMemoryRuns?: Iterable<SubagentRunRecord>;
now?: number;
}): SubagentRunReadIndex {
const { runs } = params;
const now = params.now ?? Date.now();
const inMemoryDisplayByChildSessionKey = new Map<
string,
{
latestInMemoryActive: SubagentRunRecord | null;
latestInMemoryEnded: SubagentRunRecord | null;
}
>();
const latestSnapshotActiveByChildSessionKey = new Map<string, SubagentRunRecord>();
const latestSnapshotEndedByChildSessionKey = new Map<string, SubagentRunRecord>();
const latestRunByChildSessionKey = new Map<string, LatestRunPair>();
const runsByControllerSessionKey = new Map<string, SubagentRunRecord[]>();
const latestRunByRequesterAndChildSessionKey = new Map<string, Map<string, LatestRunPair>>();
const activeDescendantCountBySessionKey = new Map<string, number>();
for (const entry of params.inMemoryRuns ?? []) {
const childSessionKey = entry.childSessionKey.trim();
if (!childSessionKey) {
continue;
}
let display = inMemoryDisplayByChildSessionKey.get(childSessionKey);
if (!display) {
display = { latestInMemoryActive: null, latestInMemoryEnded: null };
inMemoryDisplayByChildSessionKey.set(childSessionKey, display);
}
if (hasSubagentRunEnded(entry)) {
if (!display.latestInMemoryEnded || entry.createdAt > display.latestInMemoryEnded.createdAt) {
display.latestInMemoryEnded = entry;
}
continue;
}
if (!display.latestInMemoryActive || entry.createdAt > display.latestInMemoryActive.createdAt) {
display.latestInMemoryActive = entry;
}
}
for (const [runId, entry] of runs.entries()) {
const childSessionKey = entry.childSessionKey.trim();
const controllerSessionKey = resolveControllerSessionKey(entry);
if (controllerSessionKey) {
let controllerRuns = runsByControllerSessionKey.get(controllerSessionKey);
if (!controllerRuns) {
controllerRuns = [];
runsByControllerSessionKey.set(controllerSessionKey, controllerRuns);
}
controllerRuns.push(entry);
}
if (!childSessionKey) {
continue;
}
if (isLiveUnendedSubagentRun(entry, now)) {
rememberLatestRunEntry(latestSnapshotActiveByChildSessionKey, childSessionKey, entry);
} else {
rememberLatestRunEntry(latestSnapshotEndedByChildSessionKey, childSessionKey, entry);
}
rememberLatestRunPair(latestRunByChildSessionKey, childSessionKey, runId, entry);
const requesterSessionKey = entry.requesterSessionKey;
if (!requesterSessionKey) {
continue;
}
let latestByChild = latestRunByRequesterAndChildSessionKey.get(requesterSessionKey);
if (!latestByChild) {
latestByChild = new Map<string, LatestRunPair>();
latestRunByRequesterAndChildSessionKey.set(requesterSessionKey, latestByChild);
}
rememberLatestRunPair(latestByChild, childSessionKey, runId, entry);
}
const getDisplaySubagentRun = (childSessionKey: string): SubagentRunRecord | null => {
const key = childSessionKey.trim();
if (!key) {
return null;
}
const inMemoryDisplay = inMemoryDisplayByChildSessionKey.get(key);
if (inMemoryDisplay) {
const latestInMemoryEnded = inMemoryDisplay.latestInMemoryEnded;
const latestInMemoryActive = inMemoryDisplay.latestInMemoryActive;
if (latestInMemoryEnded || latestInMemoryActive) {
if (
latestInMemoryEnded &&
(!latestInMemoryActive || latestInMemoryEnded.createdAt > latestInMemoryActive.createdAt)
) {
return latestInMemoryEnded;
}
return latestInMemoryActive ?? latestInMemoryEnded;
}
}
return (
latestSnapshotActiveByChildSessionKey.get(key) ??
latestSnapshotEndedByChildSessionKey.get(key) ??
null
);
};
const countActiveDescendantRuns = (rootSessionKey: string): number => {
const root = rootSessionKey.trim();
if (!root) {
return 0;
}
if (activeDescendantCountBySessionKey.has(root)) {
return activeDescendantCountBySessionKey.get(root) ?? 0;
}
let count = 0;
const pending = [root];
const visited = new Set<string>([root]);
for (let index = 0; index < pending.length; index += 1) {
const requester = pending[index];
if (!requester) {
continue;
}
const latestByChild = latestRunByRequesterAndChildSessionKey.get(requester);
if (!latestByChild) {
continue;
}
for (const [childSessionKey, pair] of latestByChild.entries()) {
const latestForChildSession = latestRunByChildSessionKey.get(childSessionKey);
if (
!latestForChildSession ||
latestForChildSession.runId !== pair.runId ||
latestForChildSession.entry.requesterSessionKey !== requester
) {
continue;
}
if (isLiveUnendedSubagentRun(pair.entry, now)) {
count += 1;
}
if (!childSessionKey || visited.has(childSessionKey)) {
continue;
}
visited.add(childSessionKey);
pending.push(childSessionKey);
}
}
activeDescendantCountBySessionKey.set(root, count);
return count;
};
return {
getDisplaySubagentRun,
countActiveDescendantRuns,
runsByControllerSessionKey,
};
}
function findLatestRunForChildSession(
runs: Map<string, SubagentRunRecord>,
childSessionKey: string,

View File

@@ -1,10 +1,12 @@
import { getAgentRunContext } from "../infra/agent-events.js";
import { subagentRuns } from "./subagent-registry-memory.js";
import {
buildSubagentRunReadIndexFromRuns,
countActiveDescendantRunsFromRuns,
getSubagentRunByChildSessionKeyFromRuns,
listDescendantRunsForRequesterFromRuns,
listRunsForControllerFromRuns,
type SubagentRunReadIndex,
} from "./subagent-registry-queries.js";
import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
@@ -20,6 +22,14 @@ export {
resolveSubagentSessionStatus,
} from "./subagent-session-metrics.js";
export function buildSubagentRunReadIndex(now = Date.now()): SubagentRunReadIndex {
return buildSubagentRunReadIndexFromRuns({
runs: getSubagentRunsSnapshotForRead(subagentRuns),
inMemoryRuns: subagentRuns.values(),
now,
});
}
export function listSubagentRunsForController(controllerSessionKey: string): SubagentRunRecord[] {
return listRunsForControllerFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),

View File

@@ -23,6 +23,7 @@ const { createSessionStoreDir, openClient } = setupGatewaySessionsTestHarness();
test("sessions.compaction.* lists checkpoints and branches or restores from pre-compaction snapshots", async () => {
const { dir, storePath } = await createSessionStoreDir();
const fixture = await createCheckpointFixture(dir);
const checkpointCreatedAt = Date.now();
const { SessionManager } = await getSessionManagerModule();
await writeSessionStore({
entries: {
@@ -33,7 +34,7 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre-
checkpointId: "checkpoint-1",
sessionKey: "agent:main:main",
sessionId: fixture.sessionId,
createdAt: Date.now(),
createdAt: checkpointCreatedAt,
reason: "manual",
tokensBefore: 123,
tokensAfter: 45,
@@ -64,7 +65,9 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre-
compactionCheckpointCount?: number;
latestCompactionCheckpoint?: {
checkpointId: string;
createdAt: number;
reason: string;
summary?: string;
tokensBefore?: number;
tokensAfter?: number;
};
@@ -75,8 +78,11 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre-
(session) => session.key === "agent:main:main",
);
expect(main?.compactionCheckpointCount).toBe(1);
expect(main?.latestCompactionCheckpoint?.checkpointId).toBe("checkpoint-1");
expect(main?.latestCompactionCheckpoint?.reason).toBe("manual");
expect(main?.latestCompactionCheckpoint).toEqual({
checkpointId: "checkpoint-1",
createdAt: checkpointCreatedAt,
reason: "manual",
});
const listedCheckpoints = await rpcReq<{
ok: true;

View File

@@ -27,6 +27,7 @@ import {
resolveThinkingDefault,
} from "../agents/model-selection.js";
import {
buildSubagentRunReadIndex,
countActiveDescendantRuns,
getSessionDisplaySubagentRunByChildSessionKey,
getSubagentSessionRuntimeMs,
@@ -267,6 +268,33 @@ function resolveLatestCompactionCheckpoint(
);
}
function buildCompactionCheckpointPreview(
checkpoint: NonNullable<SessionEntry["compactionCheckpoints"]>[number] | undefined,
): GatewaySessionRow["latestCompactionCheckpoint"] {
if (!checkpoint) {
return undefined;
}
const checkpointId = normalizeOptionalString(checkpoint.checkpointId);
const createdAt = checkpoint.createdAt;
const reason = checkpoint.reason;
if (!checkpointId || typeof createdAt !== "number" || !Number.isFinite(createdAt)) {
return undefined;
}
if (
reason !== "manual" &&
reason !== "auto-threshold" &&
reason !== "overflow-retry" &&
reason !== "timeout-retry"
) {
return undefined;
}
return {
checkpointId,
createdAt,
reason,
};
}
function resolveEstimatedSessionCostUsd(params: {
cfg: OpenClawConfig;
provider?: string;
@@ -341,17 +369,29 @@ function shouldKeepStoreOnlyChildLink(entry: SessionEntry, now: number): boolean
);
}
type SessionListRowContext = {
subagentRuns: ReturnType<typeof buildSubagentRunReadIndex>;
storeChildSessionsByKey: Map<string, string[]>;
};
function resolveRuntimeChildSessionKeys(
controllerSessionKey: string,
now = Date.now(),
subagentRuns?: SessionListRowContext["subagentRuns"],
): string[] | undefined {
const childSessionKeys = new Set<string>();
for (const entry of listSubagentRunsForController(controllerSessionKey)) {
const controllerKey = controllerSessionKey.trim();
const runs =
subagentRuns?.runsByControllerSessionKey.get(controllerKey) ??
listSubagentRunsForController(controllerSessionKey);
for (const entry of runs) {
const childSessionKey = normalizeOptionalString(entry.childSessionKey);
if (!childSessionKey) {
continue;
}
const latest = getSessionDisplaySubagentRunByChildSessionKey(childSessionKey);
const latest = subagentRuns
? subagentRuns.getDisplaySubagentRun(childSessionKey)
: getSessionDisplaySubagentRunByChildSessionKey(childSessionKey);
if (!latest) {
continue;
}
@@ -363,7 +403,9 @@ function resolveRuntimeChildSessionKeys(
}
if (
!shouldKeepSubagentRunChildLink(latest, {
activeDescendants: countActiveDescendantRuns(childSessionKey),
activeDescendants: subagentRuns
? subagentRuns.countActiveDescendantRuns(childSessionKey)
: countActiveDescendantRuns(childSessionKey),
now,
})
) {
@@ -393,6 +435,7 @@ function addChildSessionKey(
function buildStoreChildSessionIndex(
store: Record<string, SessionEntry>,
now = Date.now(),
subagentRuns?: SessionListRowContext["subagentRuns"],
): Map<string, string[]> {
const childSessionsByKey = new Map<string, string[]>();
for (const [key, entry] of Object.entries(store)) {
@@ -406,7 +449,9 @@ function buildStoreChildSessionIndex(
if (parentKeys.length === 0) {
continue;
}
const latest = getSessionDisplaySubagentRunByChildSessionKey(key);
const latest = subagentRuns
? subagentRuns.getDisplaySubagentRun(key)
: getSessionDisplaySubagentRunByChildSessionKey(key);
let latestControllerSessionKey: string | undefined;
if (latest) {
latestControllerSessionKey =
@@ -414,7 +459,9 @@ function buildStoreChildSessionIndex(
normalizeOptionalString(latest.requesterSessionKey);
if (
!shouldKeepSubagentRunChildLink(latest, {
activeDescendants: countActiveDescendantRuns(key),
activeDescendants: subagentRuns
? subagentRuns.countActiveDescendantRuns(key)
: countActiveDescendantRuns(key),
now,
})
) {
@@ -433,6 +480,17 @@ function buildStoreChildSessionIndex(
return childSessionsByKey;
}
function buildSessionListRowContext(params: {
store: Record<string, SessionEntry>;
now: number;
}): SessionListRowContext {
const subagentRuns = buildSubagentRunReadIndex(params.now);
return {
subagentRuns,
storeChildSessionsByKey: buildStoreChildSessionIndex(params.store, params.now, subagentRuns),
};
}
function mergeChildSessionKeys(
runtimeChildSessions: string[] | undefined,
storeChildSessions: string[] | undefined,
@@ -450,9 +508,16 @@ function resolveChildSessionKeys(
controllerSessionKey: string,
store: Record<string, SessionEntry>,
now = Date.now(),
subagentRuns?: SessionListRowContext["subagentRuns"],
): string[] | undefined {
const runtimeChildSessions = resolveRuntimeChildSessionKeys(controllerSessionKey, now);
const storeChildSessions = buildStoreChildSessionIndex(store, now).get(controllerSessionKey);
const runtimeChildSessions = resolveRuntimeChildSessionKeys(
controllerSessionKey,
now,
subagentRuns,
);
const storeChildSessions = buildStoreChildSessionIndex(store, now, subagentRuns).get(
controllerSessionKey,
);
return mergeChildSessionKeys(runtimeChildSessions, storeChildSessions);
}
@@ -1364,6 +1429,7 @@ export function buildGatewaySessionRow(params: {
includeLastMessage?: boolean;
transcriptUsageMaxBytes?: number;
storeChildSessionsByKey?: Map<string, string[]>;
rowContext?: SessionListRowContext;
}): GatewaySessionRow {
const { cfg, storePath, store, key, entry } = params;
const now = params.now ?? Date.now();
@@ -1393,7 +1459,10 @@ export function buildGatewaySessionRow(params: {
const deliveryFields = normalizeSessionDeliveryFields(entry);
const parsedAgent = parseAgentSessionKey(key);
const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg));
const subagentRun = getSessionDisplaySubagentRunByChildSessionKey(key);
const rowContext = params.rowContext;
const subagentRun = rowContext
? rowContext.subagentRuns.getDisplaySubagentRun(key)
: getSessionDisplaySubagentRunByChildSessionKey(key);
const subagentOwner =
normalizeOptionalString(subagentRun?.controllerSessionKey) ||
normalizeOptionalString(subagentRun?.requesterSessionKey);
@@ -1501,11 +1570,13 @@ export function buildGatewaySessionRow(params: {
: transcriptUsage?.totalTokensFresh === true;
const childSessions = params.storeChildSessionsByKey
? mergeChildSessionKeys(
resolveRuntimeChildSessionKeys(key, now),
resolveRuntimeChildSessionKeys(key, now, rowContext?.subagentRuns),
params.storeChildSessionsByKey.get(key),
)
: resolveChildSessionKeys(key, store, now);
const latestCompactionCheckpoint = resolveLatestCompactionCheckpoint(entry);
: resolveChildSessionKeys(key, store, now, rowContext?.subagentRuns);
const latestCompactionCheckpoint = buildCompactionCheckpointPreview(
resolveLatestCompactionCheckpoint(entry),
);
const agentRuntime = resolveAgentRuntimeMetadata(cfg, sessionAgentId);
const selectedOrRuntimeModelProvider = selectedModel?.provider ?? modelProvider;
const selectedOrRuntimeModel = selectedModel?.model ?? model;
@@ -1800,7 +1871,7 @@ export function listSessionsFromStore(params: {
const now = Date.now();
const sessionListTranscriptUsageMaxBytes = 64 * 1024;
const sessionListTranscriptFieldRows = 100;
const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now);
const rowContext = buildSessionListRowContext({ store, now });
const includeDerivedTitles = opts.includeDerivedTitles === true;
const includeLastMessage = opts.includeLastMessage === true;
@@ -1819,7 +1890,8 @@ export function listSessionsFromStore(params: {
includeDerivedTitles: includeTranscriptFields && includeDerivedTitles,
includeLastMessage: includeTranscriptFields && includeLastMessage,
transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes,
storeChildSessionsByKey,
storeChildSessionsByKey: rowContext.storeChildSessionsByKey,
rowContext,
});
});
@@ -1853,7 +1925,7 @@ export async function listSessionsFromStoreAsync(params: {
const now = Date.now();
const sessionListTranscriptUsageMaxBytes = 64 * 1024;
const sessionListTranscriptFieldRows = 100;
const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now);
const rowContext = buildSessionListRowContext({ store, now });
const includeDerivedTitles = opts.includeDerivedTitles === true;
const includeLastMessage = opts.includeLastMessage === true;
@@ -1874,7 +1946,8 @@ export async function listSessionsFromStoreAsync(params: {
includeDerivedTitles: false,
includeLastMessage: false,
transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes,
storeChildSessionsByKey,
storeChildSessionsByKey: rowContext.storeChildSessionsByKey,
rowContext,
});
if (
entry?.sessionId &&

View File

@@ -27,6 +27,11 @@ export type SessionRunStatus = "running" | "done" | "failed" | "killed" | "timeo
type SubagentRunState = "active" | "interrupted" | "historical";
export type SessionCompactionCheckpointPreview = Pick<
SessionCompactionCheckpoint,
"checkpointId" | "createdAt" | "reason"
>;
export type GatewaySessionRow = {
key: string;
spawnedBy?: string;
@@ -84,7 +89,7 @@ export type GatewaySessionRow = {
lastAccountId?: string;
lastThreadId?: SessionEntry["lastThreadId"];
compactionCheckpointCount?: number;
latestCompactionCheckpoint?: SessionCompactionCheckpoint;
latestCompactionCheckpoint?: SessionCompactionCheckpointPreview;
pluginExtensions?: PluginSessionExtensionProjection[];
};

View File

@@ -409,6 +409,11 @@ export type SessionCompactionCheckpoint = {
postCompaction: SessionCompactionTranscriptReference;
};
export type SessionCompactionCheckpointPreview = Pick<
SessionCompactionCheckpoint,
"checkpointId" | "createdAt" | "reason"
>;
export type GatewaySessionRow = {
key: string;
spawnedBy?: string;
@@ -447,7 +452,7 @@ export type GatewaySessionRow = {
agentRuntime?: GatewayAgentRuntime;
contextTokens?: number;
compactionCheckpointCount?: number;
latestCompactionCheckpoint?: SessionCompactionCheckpoint;
latestCompactionCheckpoint?: SessionCompactionCheckpointPreview;
};
export type SessionsListResult = SessionsListResultBase<GatewaySessionsDefaults, GatewaySessionRow>;