Files
openclaw/src/gateway/server-maintenance.ts
2026-05-08 22:52:17 -04:00

216 lines
7.3 KiB
TypeScript

import type { HealthSummary } from "../commands/health.js";
import { sweepStaleRunContexts } from "../infra/agent-events.js";
import { cleanOldMedia } from "../media/store.js";
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
import { pruneStaleControlPlaneBuckets } from "./control-plane-rate-limit.js";
import type { ChatRunEntry } from "./server-chat.js";
import {
DEDUPE_MAX,
DEDUPE_TTL_MS,
HEALTH_REFRESH_INTERVAL_MS,
TICK_INTERVAL_MS,
} from "./server-constants.js";
import type { DedupeEntry } from "./server-shared.js";
import { formatError } from "./server-utils.js";
import { setBroadcastHealthUpdate } from "./server/health-state.js";
export function startGatewayMaintenanceTimers(params: {
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
nodeSendToAllSubscribed: (event: string, payload: unknown) => void;
getPresenceVersion: () => number;
getHealthVersion: () => number;
refreshGatewayHealthSnapshot: (opts?: {
probe?: boolean;
includeSensitive?: boolean;
}) => Promise<HealthSummary>;
logHealth: { error: (msg: string) => void };
dedupe: Map<string, DedupeEntry>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunState: { abortedRuns: Map<string, number> };
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
chatDeltaLastBroadcastLen: Map<string, number>;
removeChatRun: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
agentRunSeq: Map<string, number>;
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
mediaCleanupTtlMs?: number;
}): {
tickInterval: ReturnType<typeof setInterval>;
healthInterval: ReturnType<typeof setInterval>;
dedupeCleanup: ReturnType<typeof setInterval>;
mediaCleanup: ReturnType<typeof setInterval> | null;
} {
setBroadcastHealthUpdate((snap: HealthSummary) => {
params.broadcast("health", snap, {
stateVersion: {
presence: params.getPresenceVersion(),
health: params.getHealthVersion(),
},
});
params.nodeSendToAllSubscribed("health", snap);
});
// periodic keepalive
const tickInterval = setInterval(() => {
const payload = { ts: Date.now() };
params.broadcast("tick", payload);
params.nodeSendToAllSubscribed("tick", payload);
}, TICK_INTERVAL_MS);
// periodic health refresh to keep cached snapshot warm
const healthInterval = setInterval(() => {
void params
.refreshGatewayHealthSnapshot({ probe: true })
.catch((err) => params.logHealth.error(`refresh failed: ${formatError(err)}`));
}, HEALTH_REFRESH_INTERVAL_MS);
// Prime cache so first client gets a snapshot without waiting.
void params
.refreshGatewayHealthSnapshot({ probe: true })
.catch((err) => params.logHealth.error(`initial refresh failed: ${formatError(err)}`));
// dedupe cache cleanup
const dedupeCleanup = setInterval(() => {
const AGENT_RUN_SEQ_MAX = 10_000;
const now = Date.now();
const isActiveRunDedupeKey = (key: string) => {
if (!key.startsWith("agent:") && !key.startsWith("chat:")) {
return false;
}
const runId = key.slice(key.indexOf(":") + 1);
const entry = runId ? params.chatAbortControllers.get(runId) : undefined;
if (!entry) {
return false;
}
return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent";
};
for (const [k, v] of params.dedupe) {
if (isActiveRunDedupeKey(k)) {
continue;
}
if (now - v.ts > DEDUPE_TTL_MS) {
params.dedupe.delete(k);
}
}
if (params.dedupe.size > DEDUPE_MAX) {
const excess = params.dedupe.size - DEDUPE_MAX;
const oldestKeys = [...params.dedupe.entries()]
.filter(([key]) => !isActiveRunDedupeKey(key))
.toSorted(([, left], [, right]) => left.ts - right.ts)
.slice(0, excess)
.map(([key]) => key);
for (const key of oldestKeys) {
params.dedupe.delete(key);
}
}
if (params.agentRunSeq.size > AGENT_RUN_SEQ_MAX) {
const excess = params.agentRunSeq.size - AGENT_RUN_SEQ_MAX;
let removed = 0;
for (const runId of params.agentRunSeq.keys()) {
params.agentRunSeq.delete(runId);
removed += 1;
if (removed >= excess) {
break;
}
}
}
for (const [runId, entry] of params.chatAbortControllers) {
if (now <= entry.expiresAtMs) {
continue;
}
abortChatRunById(
{
chatAbortControllers: params.chatAbortControllers,
chatRunBuffers: params.chatRunBuffers,
chatDeltaSentAt: params.chatDeltaSentAt,
chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen,
chatAbortedRuns: params.chatRunState.abortedRuns,
removeChatRun: params.removeChatRun,
agentRunSeq: params.agentRunSeq,
broadcast: params.broadcast,
nodeSendToSession: params.nodeSendToSession,
},
{ runId, sessionKey: entry.sessionKey, stopReason: "timeout" },
);
}
const ABORTED_RUN_TTL_MS = 60 * 60_000;
for (const [runId, abortedAt] of params.chatRunState.abortedRuns) {
if (now - abortedAt <= ABORTED_RUN_TTL_MS) {
continue;
}
params.chatRunState.abortedRuns.delete(runId);
params.chatRunBuffers.delete(runId);
params.chatDeltaSentAt.delete(runId);
params.chatDeltaLastBroadcastLen.delete(runId);
}
// Prune expired control-plane rate-limit buckets to prevent unbounded
// growth when many unique clients connect over time.
pruneStaleControlPlaneBuckets(now);
// Sweep stale buffers for runs that were never explicitly aborted.
// Only reap orphaned buffers after the abort controller is gone; active
// runs can legitimately sit idle while tools/models work.
for (const [runId, lastSentAt] of params.chatDeltaSentAt) {
if (params.chatRunState.abortedRuns.has(runId)) {
continue; // already handled above
}
if (params.chatAbortControllers.has(runId)) {
continue;
}
if (now - lastSentAt <= ABORTED_RUN_TTL_MS) {
continue;
}
params.chatRunBuffers.delete(runId);
params.chatDeltaSentAt.delete(runId);
params.chatDeltaLastBroadcastLen.delete(runId);
}
// Sweep stale agent run contexts (orphaned when lifecycle end/error is missed).
sweepStaleRunContexts();
}, 60_000);
if (typeof params.mediaCleanupTtlMs !== "number") {
return { tickInterval, healthInterval, dedupeCleanup, mediaCleanup: null };
}
let mediaCleanupInFlight: Promise<void> | null = null;
const runMediaCleanup = () => {
if (mediaCleanupInFlight) {
return mediaCleanupInFlight;
}
mediaCleanupInFlight = cleanOldMedia(params.mediaCleanupTtlMs, {
recursive: true,
pruneEmptyDirs: true,
})
.catch((err) => {
params.logHealth.error(`media cleanup failed: ${formatError(err)}`);
})
.finally(() => {
mediaCleanupInFlight = null;
});
return mediaCleanupInFlight;
};
const mediaCleanup = setInterval(() => {
void runMediaCleanup();
}, 60 * 60_000);
void runMediaCleanup();
return { tickInterval, healthInterval, dedupeCleanup, mediaCleanup };
}