mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:50:43 +00:00
fix(gateway): defer embedded runner imports
This commit is contained in:
@@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/startup: lazy-load plugin HTTP route dispatch when active plugin routes exist so no-plugin Gateway boot skips plugin route runtime scope setup. Thanks @vincentkoc.
|
||||
- Gateway/startup: move chat run/subscriber registries onto a lightweight state module and defer chat/session event projection until the first event so Gateway boot skips session IO imports. Thanks @vincentkoc.
|
||||
- Gateway/startup: keep node session runtime on a lightweight JSON parser instead of importing gateway method validation helpers during boot. Thanks @vincentkoc.
|
||||
- Gateway/startup: read embedded-run activity from a lightweight shared state module so restart deferral no longer imports the embedded runner during Gateway boot. Thanks @vincentkoc.
|
||||
- CLI/Gateway: use a parse-only config snapshot for plain `gateway status` reads and reuse same-path service config context so status no longer spends tens of seconds in full config validation before printing. Thanks @vincentkoc.
|
||||
- Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc.
|
||||
- Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq.
|
||||
|
||||
68
src/agents/pi-embedded-runner/run-state.ts
Normal file
68
src/agents/pi-embedded-runner/run-state.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import {
|
||||
getActiveReplyRunCount,
|
||||
listActiveReplyRunSessionIds,
|
||||
} from "../../auto-reply/reply/reply-run-registry.js";
|
||||
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
|
||||
|
||||
export type EmbeddedPiQueueHandle = {
|
||||
kind?: "embedded";
|
||||
queueMessage: (text: string) => Promise<void>;
|
||||
isStreaming: () => boolean;
|
||||
isCompacting: () => boolean;
|
||||
cancel?: (reason?: "user_abort" | "restart" | "superseded") => void;
|
||||
abort: () => void;
|
||||
};
|
||||
|
||||
export type ActiveEmbeddedRunSnapshot = {
|
||||
transcriptLeafId: string | null;
|
||||
messages?: unknown[];
|
||||
inFlightPrompt?: string;
|
||||
};
|
||||
|
||||
export type EmbeddedRunModelSwitchRequest = {
|
||||
provider: string;
|
||||
model: string;
|
||||
authProfileId?: string;
|
||||
authProfileIdSource?: "auto" | "user";
|
||||
};
|
||||
|
||||
export type EmbeddedRunWaiter = {
|
||||
resolve: (ended: boolean) => void;
|
||||
timer: NodeJS.Timeout;
|
||||
};
|
||||
|
||||
const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
|
||||
|
||||
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
|
||||
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
|
||||
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
|
||||
sessionIdsByKey: new Map<string, string>(),
|
||||
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
|
||||
modelSwitchRequests: new Map<string, EmbeddedRunModelSwitchRequest>(),
|
||||
}));
|
||||
|
||||
export const ACTIVE_EMBEDDED_RUNS =
|
||||
embeddedRunState.activeRuns ??
|
||||
(embeddedRunState.activeRuns = new Map<string, EmbeddedPiQueueHandle>());
|
||||
export const ACTIVE_EMBEDDED_RUN_SNAPSHOTS =
|
||||
embeddedRunState.snapshots ??
|
||||
(embeddedRunState.snapshots = new Map<string, ActiveEmbeddedRunSnapshot>());
|
||||
export const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY =
|
||||
embeddedRunState.sessionIdsByKey ??
|
||||
(embeddedRunState.sessionIdsByKey = new Map<string, string>());
|
||||
export const EMBEDDED_RUN_WAITERS =
|
||||
embeddedRunState.waiters ??
|
||||
(embeddedRunState.waiters = new Map<string, Set<EmbeddedRunWaiter>>());
|
||||
export const EMBEDDED_RUN_MODEL_SWITCH_REQUESTS =
|
||||
embeddedRunState.modelSwitchRequests ??
|
||||
(embeddedRunState.modelSwitchRequests = new Map<string, EmbeddedRunModelSwitchRequest>());
|
||||
|
||||
export function getActiveEmbeddedRunCount(): number {
|
||||
let activeCount = ACTIVE_EMBEDDED_RUNS.size;
|
||||
for (const sessionId of listActiveReplyRunSessionIds()) {
|
||||
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
|
||||
activeCount += 1;
|
||||
}
|
||||
}
|
||||
return Math.max(activeCount, getActiveReplyRunCount());
|
||||
}
|
||||
@@ -1,10 +1,8 @@
|
||||
import {
|
||||
abortActiveReplyRuns,
|
||||
abortReplyRunBySessionId,
|
||||
getActiveReplyRunCount,
|
||||
isReplyRunActiveForSessionId,
|
||||
isReplyRunStreamingForSessionId,
|
||||
listActiveReplyRunSessionIds,
|
||||
queueReplyRunMessage,
|
||||
resolveActiveReplyRunSessionId,
|
||||
waitForReplyRunEndBySessionId,
|
||||
@@ -14,64 +12,26 @@ import {
|
||||
logMessageQueued,
|
||||
logSessionStateChange,
|
||||
} from "../../logging/diagnostic.js";
|
||||
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import {
|
||||
ACTIVE_EMBEDDED_RUNS,
|
||||
ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY,
|
||||
ACTIVE_EMBEDDED_RUN_SNAPSHOTS,
|
||||
EMBEDDED_RUN_MODEL_SWITCH_REQUESTS,
|
||||
EMBEDDED_RUN_WAITERS,
|
||||
getActiveEmbeddedRunCount,
|
||||
type ActiveEmbeddedRunSnapshot,
|
||||
type EmbeddedPiQueueHandle,
|
||||
type EmbeddedRunModelSwitchRequest,
|
||||
type EmbeddedRunWaiter,
|
||||
} from "./run-state.js";
|
||||
|
||||
export type EmbeddedPiQueueHandle = {
|
||||
kind?: "embedded";
|
||||
queueMessage: (text: string) => Promise<void>;
|
||||
isStreaming: () => boolean;
|
||||
isCompacting: () => boolean;
|
||||
cancel?: (reason?: "user_abort" | "restart" | "superseded") => void;
|
||||
abort: () => void;
|
||||
};
|
||||
|
||||
export type ActiveEmbeddedRunSnapshot = {
|
||||
transcriptLeafId: string | null;
|
||||
messages?: unknown[];
|
||||
inFlightPrompt?: string;
|
||||
};
|
||||
|
||||
type EmbeddedRunWaiter = {
|
||||
resolve: (ended: boolean) => void;
|
||||
timer: NodeJS.Timeout;
|
||||
};
|
||||
|
||||
export type EmbeddedRunModelSwitchRequest = {
|
||||
provider: string;
|
||||
model: string;
|
||||
authProfileId?: string;
|
||||
authProfileIdSource?: "auto" | "user";
|
||||
};
|
||||
|
||||
/**
|
||||
* Use global singleton state so busy/streaming checks stay consistent even
|
||||
* when the bundler emits multiple copies of this module into separate chunks.
|
||||
*/
|
||||
const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
|
||||
|
||||
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
|
||||
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
|
||||
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
|
||||
sessionIdsByKey: new Map<string, string>(),
|
||||
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
|
||||
modelSwitchRequests: new Map<string, EmbeddedRunModelSwitchRequest>(),
|
||||
}));
|
||||
const ACTIVE_EMBEDDED_RUNS =
|
||||
embeddedRunState.activeRuns ??
|
||||
(embeddedRunState.activeRuns = new Map<string, EmbeddedPiQueueHandle>());
|
||||
const ACTIVE_EMBEDDED_RUN_SNAPSHOTS =
|
||||
embeddedRunState.snapshots ??
|
||||
(embeddedRunState.snapshots = new Map<string, ActiveEmbeddedRunSnapshot>());
|
||||
const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY =
|
||||
embeddedRunState.sessionIdsByKey ??
|
||||
(embeddedRunState.sessionIdsByKey = new Map<string, string>());
|
||||
const EMBEDDED_RUN_WAITERS =
|
||||
embeddedRunState.waiters ??
|
||||
(embeddedRunState.waiters = new Map<string, Set<EmbeddedRunWaiter>>());
|
||||
const EMBEDDED_RUN_MODEL_SWITCH_REQUESTS =
|
||||
embeddedRunState.modelSwitchRequests ??
|
||||
(embeddedRunState.modelSwitchRequests = new Map<string, EmbeddedRunModelSwitchRequest>());
|
||||
export {
|
||||
getActiveEmbeddedRunCount,
|
||||
type ActiveEmbeddedRunSnapshot,
|
||||
type EmbeddedPiQueueHandle,
|
||||
type EmbeddedRunModelSwitchRequest,
|
||||
} from "./run-state.js";
|
||||
|
||||
function setActiveRunSessionKey(sessionKey: string | undefined, sessionId: string): void {
|
||||
const normalizedSessionKey = sessionKey?.trim();
|
||||
@@ -216,16 +176,6 @@ export function resolveActiveEmbeddedRunSessionId(sessionKey: string): string |
|
||||
);
|
||||
}
|
||||
|
||||
export function getActiveEmbeddedRunCount(): number {
|
||||
let activeCount = ACTIVE_EMBEDDED_RUNS.size;
|
||||
for (const sessionId of listActiveReplyRunSessionIds()) {
|
||||
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
|
||||
activeCount += 1;
|
||||
}
|
||||
}
|
||||
return Math.max(activeCount, getActiveReplyRunCount());
|
||||
}
|
||||
|
||||
export function getActiveEmbeddedRunSnapshot(
|
||||
sessionId: string,
|
||||
): ActiveEmbeddedRunSnapshot | undefined {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { resetModelCatalogCache } from "../agents/model-catalog.js";
|
||||
import { disposeAllSessionMcpRuntimes } from "../agents/pi-bundle-mcp-tools.js";
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js";
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js";
|
||||
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js";
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js";
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js";
|
||||
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
||||
import type { CanvasHostServer } from "../canvas-host/server.js";
|
||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
|
||||
@@ -216,6 +216,16 @@ vi.mock("../agents/pi-embedded-runner/runs.js", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../agents/pi-embedded-runner/run-state.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../agents/pi-embedded-runner/run-state.js")>(
|
||||
"../agents/pi-embedded-runner/run-state.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
getActiveEmbeddedRunCount: () => hoisted.activeEmbeddedRunCount.value,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../auto-reply/reply/dispatcher-registry.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../auto-reply/reply/dispatcher-registry.js")>(
|
||||
"../auto-reply/reply/dispatcher-registry.js",
|
||||
|
||||
Reference in New Issue
Block a user