mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix: reap MCP runtimes on config reload
This commit is contained in:
@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Telegram/webhook: acknowledge validated webhook updates before running bot middleware, keeping slow agent turns from tripping Telegram delivery retries while preserving per-chat processing lanes. Fixes #71392.
|
||||
- MCP: retire one-shot embedded bundled MCP runtimes at run end, skip bundle-MCP startup when a runtime tool allowlist cannot reach bundle-MCP tools, and add `mcp.sessionIdleTtlMs` idle eviction for leaked session runtimes. Fixes #71106, #71110, #70389, and #70808.
|
||||
- MCP/config reload: hot-apply `mcp.*` changes by disposing cached session MCP runtimes, and dispose bundled MCP runtimes during gateway shutdown so removed `mcp.servers` entries reap child processes promptly. Fixes #60656.
|
||||
- Gateway/restart continuation: durably hand restart continuations to a session-delivery queue before deleting the restart sentinel, recover queued continuation work after crashy restarts, and fall back to a session-only wake when no channel route survives reboot. (#70780) Thanks @fuller-stack-dev.
|
||||
- Agents/tool-result pruning: harden the tool-result character estimator and context-pruning loops against malformed `{ type: "text" }` blocks created by void or undefined tool handler results, serializing non-string text payloads for size accounting so they cannot bypass trimming as zero-sized. Fixes #34979. (#51267) Thanks @cgdusek.
|
||||
- Daemon/service-env: add Nix Home Manager profile bin directories to generated gateway service PATHs on macOS and Linux, honoring `NIX_PROFILES` right-to-left precedence and falling back to `~/.nix-profile/bin` when unset. Fixes #44402. (#59935) Thanks @jerome-benoit.
|
||||
|
||||
@@ -85,6 +85,9 @@ target server during config edits.
|
||||
- `mcp.sessionIdleTtlMs`: idle TTL for session-scoped bundled MCP runtimes.
|
||||
One-shot embedded runs request run-end cleanup; this TTL is the backstop for
|
||||
long-lived sessions and future callers.
|
||||
- Changes under `mcp.*` hot-apply by disposing cached session MCP runtimes.
|
||||
The next tool discovery/use recreates them from the new config, so removed
|
||||
`mcp.servers` entries are reaped immediately instead of waiting for idle TTL.
|
||||
|
||||
See [MCP](/cli/mcp#openclaw-as-an-mcp-client-registry) and
|
||||
[CLI backends](/gateway/cli-backends#bundle-mcp-overlays) for runtime behavior.
|
||||
|
||||
@@ -545,7 +545,7 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang
|
||||
| Agent & models | `agent`, `agents`, `models`, `routing` | No |
|
||||
| Automation | `hooks`, `cron`, `agent.heartbeat` | No |
|
||||
| Sessions & messages | `session`, `messages` | No |
|
||||
| Tools & media | `tools`, `browser`, `skills`, `audio`, `talk` | No |
|
||||
| Tools & media | `tools`, `browser`, `skills`, `mcp`, `audio`, `talk` | No |
|
||||
| UI & misc | `ui`, `logging`, `identity`, `bindings` | No |
|
||||
| Gateway server | `gateway.*` (port, bind, auth, tailscale, TLS, HTTP) | **Yes** |
|
||||
| Infrastructure | `discovery`, `canvasHost`, `plugins` | **Yes** |
|
||||
|
||||
@@ -19,6 +19,7 @@ export type GatewayReloadPlan = {
|
||||
restartHeartbeat: boolean;
|
||||
restartHealthMonitor: boolean;
|
||||
restartChannels: Set<ChannelKind>;
|
||||
disposeMcpRuntimes: boolean;
|
||||
noopPaths: string[];
|
||||
};
|
||||
|
||||
@@ -34,6 +35,7 @@ type ReloadAction =
|
||||
| "restart-cron"
|
||||
| "restart-heartbeat"
|
||||
| "restart-health-monitor"
|
||||
| "dispose-mcp-runtimes"
|
||||
| `restart-channel:${ChannelId}`;
|
||||
|
||||
export type GatewayReloadPlanOptions = {
|
||||
@@ -90,6 +92,7 @@ const BASE_RELOAD_RULES: ReloadRule[] = [
|
||||
},
|
||||
{ prefix: "agent.heartbeat", kind: "hot", actions: ["restart-heartbeat"] },
|
||||
{ prefix: "cron", kind: "hot", actions: ["restart-cron"] },
|
||||
{ prefix: "mcp", kind: "hot", actions: ["dispose-mcp-runtimes"] },
|
||||
];
|
||||
|
||||
const BASE_RELOAD_RULES_TAIL: ReloadRule[] = [
|
||||
@@ -276,6 +279,7 @@ export function buildGatewayReloadPlan(
|
||||
restartHeartbeat: false,
|
||||
restartHealthMonitor: false,
|
||||
restartChannels: new Set(),
|
||||
disposeMcpRuntimes: false,
|
||||
noopPaths: [],
|
||||
};
|
||||
|
||||
@@ -301,6 +305,9 @@ export function buildGatewayReloadPlan(
|
||||
case "restart-health-monitor":
|
||||
plan.restartHealthMonitor = true;
|
||||
break;
|
||||
case "dispose-mcp-runtimes":
|
||||
plan.disposeMcpRuntimes = true;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -322,6 +322,13 @@ describe("buildGatewayReloadPlan", () => {
|
||||
expect(plan.hotReasons).toContain("gateway.channelHealthCheckMinutes");
|
||||
});
|
||||
|
||||
it("hot-reloads MCP config changes by disposing cached runtimes", () => {
|
||||
const plan = buildGatewayReloadPlan(["mcp.servers.context7.command"]);
|
||||
expect(plan.restartGateway).toBe(false);
|
||||
expect(plan.disposeMcpRuntimes).toBe(true);
|
||||
expect(plan.hotReasons).toContain("mcp.servers.context7.command");
|
||||
});
|
||||
|
||||
it("treats gateway.remote as no-op", () => {
|
||||
const plan = buildGatewayReloadPlan(["gateway.remote.url"]);
|
||||
expect(plan.restartGateway).toBe(false);
|
||||
@@ -382,6 +389,12 @@ describe("buildGatewayReloadPlan", () => {
|
||||
expectHotPath: "agents.list",
|
||||
expectRestartHeartbeat: true,
|
||||
},
|
||||
{
|
||||
path: "mcp.servers.context7",
|
||||
expectRestartGateway: false,
|
||||
expectHotPath: "mcp.servers.context7",
|
||||
expectDisposeMcpRuntimes: true,
|
||||
},
|
||||
{
|
||||
path: "gateway.remote.url",
|
||||
expectRestartGateway: false,
|
||||
@@ -421,6 +434,9 @@ describe("buildGatewayReloadPlan", () => {
|
||||
if (testCase.expectRestartHeartbeat) {
|
||||
expect(plan.restartHeartbeat).toBe(true);
|
||||
}
|
||||
if (testCase.expectDisposeMcpRuntimes) {
|
||||
expect(plan.disposeMcpRuntimes).toBe(true);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ function isNoopReloadPlan(plan: GatewayReloadPlan): boolean {
|
||||
!plan.restartCron &&
|
||||
!plan.restartHeartbeat &&
|
||||
!plan.restartHealthMonitor &&
|
||||
!plan.disposeMcpRuntimes &&
|
||||
plan.restartChannels.size === 0
|
||||
);
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ function createReloadPlan(overrides?: Partial<GatewayReloadPlan>): GatewayReload
|
||||
restartHeartbeat: overrides?.restartHeartbeat ?? false,
|
||||
restartHealthMonitor: overrides?.restartHealthMonitor ?? false,
|
||||
restartChannels: overrides?.restartChannels ?? new Set(),
|
||||
disposeMcpRuntimes: overrides?.disposeMcpRuntimes ?? false,
|
||||
noopPaths: overrides?.noopPaths ?? [],
|
||||
};
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
const mocks = {
|
||||
logWarn: vi.fn(),
|
||||
disposeAgentHarnesses: vi.fn(async () => undefined),
|
||||
disposeAllSessionMcpRuntimes: vi.fn(async () => undefined),
|
||||
};
|
||||
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
|
||||
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
|
||||
@@ -24,6 +25,13 @@ vi.mock("../agents/harness/registry.js", () => ({
|
||||
disposeRegisteredAgentHarnesses: mocks.disposeAgentHarnesses,
|
||||
}));
|
||||
|
||||
vi.mock("../agents/pi-bundle-mcp-tools.js", async () => ({
|
||||
...(await vi.importActual<typeof import("../agents/pi-bundle-mcp-tools.js")>(
|
||||
"../agents/pi-bundle-mcp-tools.js",
|
||||
)),
|
||||
disposeAllSessionMcpRuntimes: mocks.disposeAllSessionMcpRuntimes,
|
||||
}));
|
||||
|
||||
vi.mock("../logging/subsystem.js", () => ({
|
||||
createSubsystemLogger: vi.fn(() => ({
|
||||
warn: mocks.logWarn,
|
||||
@@ -78,6 +86,8 @@ describe("createGatewayCloseHandler", () => {
|
||||
vi.useRealTimers();
|
||||
mocks.logWarn.mockClear();
|
||||
mocks.disposeAgentHarnesses.mockClear();
|
||||
mocks.disposeAllSessionMcpRuntimes.mockClear();
|
||||
mocks.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
it("unsubscribes lifecycle listeners during shutdown", async () => {
|
||||
@@ -95,6 +105,23 @@ describe("createGatewayCloseHandler", () => {
|
||||
expect(lifecycleUnsub).toHaveBeenCalledTimes(1);
|
||||
expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.disposeAgentHarnesses).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.disposeAllSessionMcpRuntimes).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("continues shutdown when bundle MCP runtime disposal hangs", async () => {
|
||||
vi.useFakeTimers();
|
||||
mocks.disposeAllSessionMcpRuntimes.mockReturnValue(new Promise(() => undefined));
|
||||
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
|
||||
|
||||
const closePromise = close({ reason: "test shutdown" });
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
await closePromise;
|
||||
|
||||
expect(
|
||||
mocks.logWarn.mock.calls.some(([message]) =>
|
||||
String(message).includes("bundle-mcp runtime disposal exceeded 5000ms"),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("terminates lingering websocket clients when websocket close exceeds the grace window", async () => {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { Server as HttpServer } from "node:http";
|
||||
import type { WebSocketServer } from "ws";
|
||||
import { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js";
|
||||
import { disposeAllSessionMcpRuntimes } from "../agents/pi-bundle-mcp-tools.js";
|
||||
import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js";
|
||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||
@@ -14,6 +15,7 @@ const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
|
||||
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
|
||||
const HTTP_CLOSE_GRACE_MS = 1_000;
|
||||
const HTTP_CLOSE_FORCE_WAIT_MS = 5_000;
|
||||
const MCP_RUNTIME_CLOSE_GRACE_MS = 5_000;
|
||||
|
||||
function createTimeoutRace<T>(timeoutMs: number, onTimeout: () => T) {
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
@@ -81,6 +83,7 @@ export function createGatewayCloseHandler(params: {
|
||||
releasePluginRouteRegistry?: (() => void) | null;
|
||||
stopChannel: (name: ChannelId, accountId?: string) => Promise<void>;
|
||||
pluginServices: PluginServicesHandle | null;
|
||||
disposeSessionMcpRuntimes?: () => Promise<void>;
|
||||
cron: { stop: () => void };
|
||||
heartbeatRunner: HeartbeatRunner;
|
||||
updateCheckStop?: (() => void) | null;
|
||||
@@ -138,6 +141,17 @@ export function createGatewayCloseHandler(params: {
|
||||
await params.stopChannel(plugin.id);
|
||||
}
|
||||
await disposeRegisteredAgentHarnesses();
|
||||
const disposeMcpRuntimes = params.disposeSessionMcpRuntimes ?? disposeAllSessionMcpRuntimes;
|
||||
const mcpDisposePromise = disposeMcpRuntimes().catch((err: unknown) => {
|
||||
shutdownLog.warn(`bundle-mcp runtime disposal failed during shutdown: ${String(err)}`);
|
||||
});
|
||||
const mcpDisposeTimeout = createTimeoutRace(MCP_RUNTIME_CLOSE_GRACE_MS, () => {
|
||||
shutdownLog.warn(
|
||||
`bundle-mcp runtime disposal exceeded ${MCP_RUNTIME_CLOSE_GRACE_MS}ms; continuing shutdown`,
|
||||
);
|
||||
});
|
||||
await Promise.race([mcpDisposePromise, mcpDisposeTimeout.promise]);
|
||||
mcpDisposeTimeout.clear();
|
||||
if (params.pluginServices) {
|
||||
await params.pluginServices.stop().catch(() => {});
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { resetModelCatalogCache } from "../agents/model-catalog.js";
|
||||
import { disposeAllSessionMcpRuntimes } from "../agents/pi-bundle-mcp-tools.js";
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js";
|
||||
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
@@ -57,6 +58,31 @@ type GatewayReloadLog = {
|
||||
warn: (msg: string) => void;
|
||||
};
|
||||
|
||||
const MCP_RUNTIME_RELOAD_DISPOSE_TIMEOUT_MS = 5_000;
|
||||
|
||||
async function disposeMcpRuntimesWithTimeout(params: {
|
||||
dispose: () => Promise<void>;
|
||||
timeoutMs: number;
|
||||
onWarn: (message: string) => void;
|
||||
label: string;
|
||||
}) {
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
const disposePromise = params.dispose().catch((error: unknown) => {
|
||||
params.onWarn(`${params.label} failed: ${String(error)}`);
|
||||
});
|
||||
const timeoutPromise = new Promise<"timeout">((resolve) => {
|
||||
timer = setTimeout(() => resolve("timeout"), params.timeoutMs);
|
||||
timer.unref?.();
|
||||
});
|
||||
const result = await Promise.race([disposePromise.then(() => "done" as const), timeoutPromise]);
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
if (result === "timeout") {
|
||||
params.onWarn(`${params.label} exceeded ${params.timeoutMs}ms; continuing`);
|
||||
}
|
||||
}
|
||||
|
||||
type GatewayReloadHandlerParams = {
|
||||
deps: CliDeps;
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
@@ -151,6 +177,15 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
|
||||
nextState.channelHealthMonitor = params.createHealthMonitor(nextConfig);
|
||||
}
|
||||
|
||||
if (plan.disposeMcpRuntimes) {
|
||||
await disposeMcpRuntimesWithTimeout({
|
||||
dispose: disposeAllSessionMcpRuntimes,
|
||||
timeoutMs: MCP_RUNTIME_RELOAD_DISPOSE_TIMEOUT_MS,
|
||||
onWarn: params.logReload.warn,
|
||||
label: "bundle-mcp runtime disposal during config reload",
|
||||
});
|
||||
}
|
||||
|
||||
if (plan.restartGmailWatcher) {
|
||||
await stopGmailWatcher().catch((err) => {
|
||||
params.logHooks.warn(`gmail watcher stop failed during reload: ${String(err)}`);
|
||||
|
||||
@@ -55,6 +55,7 @@ const hoisted = vi.hoisted(() => {
|
||||
const startGmailWatcher = vi.fn(async () => ({ started: true }));
|
||||
const stopGmailWatcher = vi.fn(async () => {});
|
||||
const resetModelCatalogCache = vi.fn();
|
||||
const disposeAllSessionMcpRuntimes = vi.fn(async () => {});
|
||||
|
||||
const providerManager = {
|
||||
getRuntimeSnapshot: vi.fn(() => ({
|
||||
@@ -156,6 +157,7 @@ const hoisted = vi.hoisted(() => {
|
||||
startGmailWatcher,
|
||||
stopGmailWatcher,
|
||||
resetModelCatalogCache,
|
||||
disposeAllSessionMcpRuntimes,
|
||||
providerManager,
|
||||
createChannelManager,
|
||||
startGatewayConfigReloader,
|
||||
@@ -193,6 +195,16 @@ vi.mock("../agents/model-catalog.js", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../agents/pi-bundle-mcp-tools.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../agents/pi-bundle-mcp-tools.js")>(
|
||||
"../agents/pi-bundle-mcp-tools.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
disposeAllSessionMcpRuntimes: hoisted.disposeAllSessionMcpRuntimes,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../agents/pi-embedded-runner/runs.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../agents/pi-embedded-runner/runs.js")>(
|
||||
"../agents/pi-embedded-runner/runs.js",
|
||||
@@ -282,6 +294,8 @@ describe("gateway hot reload", () => {
|
||||
hoisted.totalQueueSize.value = 0;
|
||||
hoisted.activeTaskCount.value = 0;
|
||||
hoisted.resetModelCatalogCache.mockReset();
|
||||
hoisted.disposeAllSessionMcpRuntimes.mockReset();
|
||||
hoisted.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -589,6 +603,37 @@ describe("gateway hot reload", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("disposes cached MCP runtimes on MCP config hot reloads", async () => {
|
||||
await withGatewayServer(async () => {
|
||||
const onHotReload = hoisted.getOnHotReload();
|
||||
expect(onHotReload).toBeTypeOf("function");
|
||||
|
||||
await onHotReload?.(
|
||||
{
|
||||
changedPaths: ["mcp.servers.context7.command"],
|
||||
restartGateway: false,
|
||||
restartReasons: [],
|
||||
hotReasons: ["mcp.servers.context7.command"],
|
||||
reloadHooks: false,
|
||||
restartGmailWatcher: false,
|
||||
restartCron: false,
|
||||
restartHeartbeat: false,
|
||||
restartHealthMonitor: false,
|
||||
restartChannels: new Set(),
|
||||
disposeMcpRuntimes: true,
|
||||
noopPaths: [],
|
||||
},
|
||||
{
|
||||
mcp: {
|
||||
servers: {},
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
expect(hoisted.disposeAllSessionMcpRuntimes).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("makes newly available catalog models visible in-process after hot reload", async () => {
|
||||
type TestRegistryEntry = { provider: string; id: string; name: string };
|
||||
let registryEntries: TestRegistryEntry[] = [
|
||||
|
||||
@@ -99,6 +99,7 @@ const browserSessionTabMocks = vi.hoisted(() => ({
|
||||
}));
|
||||
const bundleMcpRuntimeMocks = vi.hoisted(() => ({
|
||||
disposeSessionMcpRuntime: vi.fn(async (_sessionId: string) => {}),
|
||||
disposeAllSessionMcpRuntimes: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("../auto-reply/reply/queue.js", async () => {
|
||||
@@ -207,6 +208,7 @@ vi.mock("../plugin-sdk/browser-maintenance.js", () => ({
|
||||
|
||||
vi.mock("../agents/pi-bundle-mcp-tools.js", () => ({
|
||||
disposeSessionMcpRuntime: bundleMcpRuntimeMocks.disposeSessionMcpRuntime,
|
||||
disposeAllSessionMcpRuntimes: bundleMcpRuntimeMocks.disposeAllSessionMcpRuntimes,
|
||||
retireSessionMcpRuntime: ({ sessionId }: { sessionId?: string | null }) =>
|
||||
sessionId
|
||||
? bundleMcpRuntimeMocks.disposeSessionMcpRuntime(sessionId).then(() => true)
|
||||
|
||||
Reference in New Issue
Block a user