mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-18 05:20:48 +00:00
Merged via squash.
Prepared head SHA: ffa45893e0
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
441 lines
15 KiB
TypeScript
441 lines
15 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
|
import { normalizeModelRef, parseModelRef } from "../agents/model-selection.js";
|
|
import type { loadConfig } from "../config/config.js";
|
|
import { normalizePluginsConfig } from "../plugins/config-state.js";
|
|
import { loadOpenClawPlugins } from "../plugins/loader.js";
|
|
import { getPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js";
|
|
import { setGatewaySubagentRuntime } from "../plugins/runtime/index.js";
|
|
import type { PluginRuntime } from "../plugins/runtime/types.js";
|
|
import { ADMIN_SCOPE, WRITE_SCOPE } from "./method-scopes.js";
|
|
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
|
|
import type { ErrorShape } from "./protocol/index.js";
|
|
import { PROTOCOL_VERSION } from "./protocol/index.js";
|
|
import { handleGatewayRequest } from "./server-methods.js";
|
|
import type {
|
|
GatewayRequestContext,
|
|
GatewayRequestHandler,
|
|
GatewayRequestOptions,
|
|
} from "./server-methods/types.js";
|
|
|
|
// ── Fallback gateway context for non-WS paths (Telegram, WhatsApp, etc.) ──
|
|
// The WS path sets a per-request scope via AsyncLocalStorage, but channel
|
|
// adapters (Telegram polling, etc.) invoke the agent directly without going
|
|
// through handleGatewayRequest. We store the gateway context at startup so
|
|
// dispatchGatewayMethod can use it as a fallback.
|
|
|
|
const FALLBACK_GATEWAY_CONTEXT_STATE_KEY: unique symbol = Symbol.for(
|
|
"openclaw.fallbackGatewayContextState",
|
|
);
|
|
|
|
type FallbackGatewayContextState = {
|
|
context: GatewayRequestContext | undefined;
|
|
};
|
|
|
|
const fallbackGatewayContextState = (() => {
|
|
const globalState = globalThis as typeof globalThis & {
|
|
[FALLBACK_GATEWAY_CONTEXT_STATE_KEY]?: FallbackGatewayContextState;
|
|
};
|
|
const existing = globalState[FALLBACK_GATEWAY_CONTEXT_STATE_KEY];
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
const created: FallbackGatewayContextState = { context: undefined };
|
|
globalState[FALLBACK_GATEWAY_CONTEXT_STATE_KEY] = created;
|
|
return created;
|
|
})();
|
|
|
|
export function setFallbackGatewayContext(ctx: GatewayRequestContext): void {
|
|
// TODO: This startup snapshot can become stale if runtime config/context changes.
|
|
fallbackGatewayContextState.context = ctx;
|
|
}
|
|
|
|
type PluginSubagentOverridePolicy = {
|
|
allowModelOverride: boolean;
|
|
allowAnyModel: boolean;
|
|
hasConfiguredAllowlist: boolean;
|
|
allowedModels: Set<string>;
|
|
};
|
|
|
|
type PluginSubagentPolicyState = {
|
|
policies: Record<string, PluginSubagentOverridePolicy>;
|
|
};
|
|
|
|
const PLUGIN_SUBAGENT_POLICY_STATE_KEY: unique symbol = Symbol.for(
|
|
"openclaw.pluginSubagentOverridePolicyState",
|
|
);
|
|
|
|
const pluginSubagentPolicyState: PluginSubagentPolicyState = (() => {
|
|
const globalState = globalThis as typeof globalThis & {
|
|
[PLUGIN_SUBAGENT_POLICY_STATE_KEY]?: PluginSubagentPolicyState;
|
|
};
|
|
const existing = globalState[PLUGIN_SUBAGENT_POLICY_STATE_KEY];
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
const created: PluginSubagentPolicyState = {
|
|
policies: {},
|
|
};
|
|
globalState[PLUGIN_SUBAGENT_POLICY_STATE_KEY] = created;
|
|
return created;
|
|
})();
|
|
|
|
function normalizeAllowedModelRef(raw: string): string | null {
|
|
const trimmed = raw.trim();
|
|
if (!trimmed) {
|
|
return null;
|
|
}
|
|
if (trimmed === "*") {
|
|
return "*";
|
|
}
|
|
const slash = trimmed.indexOf("/");
|
|
if (slash <= 0 || slash >= trimmed.length - 1) {
|
|
return null;
|
|
}
|
|
const providerRaw = trimmed.slice(0, slash).trim();
|
|
const modelRaw = trimmed.slice(slash + 1).trim();
|
|
if (!providerRaw || !modelRaw) {
|
|
return null;
|
|
}
|
|
const normalized = normalizeModelRef(providerRaw, modelRaw);
|
|
return `${normalized.provider}/${normalized.model}`;
|
|
}
|
|
|
|
function setPluginSubagentOverridePolicies(cfg: ReturnType<typeof loadConfig>): void {
|
|
const normalized = normalizePluginsConfig(cfg.plugins);
|
|
const policies: PluginSubagentPolicyState["policies"] = {};
|
|
for (const [pluginId, entry] of Object.entries(normalized.entries)) {
|
|
const allowModelOverride = entry.subagent?.allowModelOverride === true;
|
|
const hasConfiguredAllowlist = entry.subagent?.hasAllowedModelsConfig === true;
|
|
const configuredAllowedModels = entry.subagent?.allowedModels ?? [];
|
|
const allowedModels = new Set<string>();
|
|
let allowAnyModel = false;
|
|
for (const modelRef of configuredAllowedModels) {
|
|
const normalizedModelRef = normalizeAllowedModelRef(modelRef);
|
|
if (!normalizedModelRef) {
|
|
continue;
|
|
}
|
|
if (normalizedModelRef === "*") {
|
|
allowAnyModel = true;
|
|
continue;
|
|
}
|
|
allowedModels.add(normalizedModelRef);
|
|
}
|
|
if (
|
|
!allowModelOverride &&
|
|
!hasConfiguredAllowlist &&
|
|
allowedModels.size === 0 &&
|
|
!allowAnyModel
|
|
) {
|
|
continue;
|
|
}
|
|
policies[pluginId] = {
|
|
allowModelOverride,
|
|
allowAnyModel,
|
|
hasConfiguredAllowlist,
|
|
allowedModels,
|
|
};
|
|
}
|
|
pluginSubagentPolicyState.policies = policies;
|
|
}
|
|
|
|
function authorizeFallbackModelOverride(params: {
|
|
pluginId?: string;
|
|
provider?: string;
|
|
model?: string;
|
|
}): { allowed: true } | { allowed: false; reason: string } {
|
|
const pluginId = params.pluginId?.trim();
|
|
if (!pluginId) {
|
|
return {
|
|
allowed: false,
|
|
reason: "provider/model override requires plugin identity in fallback subagent runs.",
|
|
};
|
|
}
|
|
const policy = pluginSubagentPolicyState.policies[pluginId];
|
|
if (!policy?.allowModelOverride) {
|
|
return {
|
|
allowed: false,
|
|
reason: `plugin "${pluginId}" is not trusted for fallback provider/model override requests.`,
|
|
};
|
|
}
|
|
if (policy.allowAnyModel) {
|
|
return { allowed: true };
|
|
}
|
|
if (policy.hasConfiguredAllowlist && policy.allowedModels.size === 0) {
|
|
return {
|
|
allowed: false,
|
|
reason: `plugin "${pluginId}" configured subagent.allowedModels, but none of the entries normalized to a valid provider/model target.`,
|
|
};
|
|
}
|
|
if (policy.allowedModels.size === 0) {
|
|
return { allowed: true };
|
|
}
|
|
const requestedModelRef = resolveRequestedFallbackModelRef(params);
|
|
if (!requestedModelRef) {
|
|
return {
|
|
allowed: false,
|
|
reason:
|
|
"fallback provider/model overrides that use an allowlist must resolve to a canonical provider/model target.",
|
|
};
|
|
}
|
|
if (policy.allowedModels.has(requestedModelRef)) {
|
|
return { allowed: true };
|
|
}
|
|
return {
|
|
allowed: false,
|
|
reason: `model override "${requestedModelRef}" is not allowlisted for plugin "${pluginId}".`,
|
|
};
|
|
}
|
|
|
|
function resolveRequestedFallbackModelRef(params: {
|
|
provider?: string;
|
|
model?: string;
|
|
}): string | null {
|
|
if (params.provider && params.model) {
|
|
const normalizedRequest = normalizeModelRef(params.provider, params.model);
|
|
return `${normalizedRequest.provider}/${normalizedRequest.model}`;
|
|
}
|
|
const rawModel = params.model?.trim();
|
|
if (!rawModel || !rawModel.includes("/")) {
|
|
return null;
|
|
}
|
|
const parsed = parseModelRef(rawModel, "");
|
|
if (!parsed?.provider || !parsed.model) {
|
|
return null;
|
|
}
|
|
return `${parsed.provider}/${parsed.model}`;
|
|
}
|
|
|
|
// ── Internal gateway dispatch for plugin runtime ────────────────────
|
|
|
|
function createSyntheticOperatorClient(params?: {
|
|
allowModelOverride?: boolean;
|
|
scopes?: string[];
|
|
}): GatewayRequestOptions["client"] {
|
|
return {
|
|
connect: {
|
|
minProtocol: PROTOCOL_VERSION,
|
|
maxProtocol: PROTOCOL_VERSION,
|
|
client: {
|
|
id: GATEWAY_CLIENT_IDS.GATEWAY_CLIENT,
|
|
version: "internal",
|
|
platform: "node",
|
|
mode: GATEWAY_CLIENT_MODES.BACKEND,
|
|
},
|
|
role: "operator",
|
|
scopes: params?.scopes ?? [WRITE_SCOPE],
|
|
},
|
|
internal: {
|
|
allowModelOverride: params?.allowModelOverride === true,
|
|
},
|
|
};
|
|
}
|
|
|
|
function hasAdminScope(client: GatewayRequestOptions["client"]): boolean {
|
|
const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : [];
|
|
return scopes.includes(ADMIN_SCOPE);
|
|
}
|
|
|
|
function canClientUseModelOverride(client: GatewayRequestOptions["client"]): boolean {
|
|
return hasAdminScope(client) || client?.internal?.allowModelOverride === true;
|
|
}
|
|
|
|
async function dispatchGatewayMethod<T>(
|
|
method: string,
|
|
params: Record<string, unknown>,
|
|
options?: {
|
|
allowSyntheticModelOverride?: boolean;
|
|
syntheticScopes?: string[];
|
|
},
|
|
): Promise<T> {
|
|
const scope = getPluginRuntimeGatewayRequestScope();
|
|
const context = scope?.context ?? fallbackGatewayContextState.context;
|
|
const isWebchatConnect = scope?.isWebchatConnect ?? (() => false);
|
|
if (!context) {
|
|
throw new Error(
|
|
`Plugin subagent dispatch requires a gateway request scope (method: ${method}). No scope set and no fallback context available.`,
|
|
);
|
|
}
|
|
|
|
let result: { ok: boolean; payload?: unknown; error?: ErrorShape } | undefined;
|
|
await handleGatewayRequest({
|
|
req: {
|
|
type: "req",
|
|
id: `plugin-subagent-${randomUUID()}`,
|
|
method,
|
|
params,
|
|
},
|
|
client:
|
|
scope?.client ??
|
|
createSyntheticOperatorClient({
|
|
allowModelOverride: options?.allowSyntheticModelOverride === true,
|
|
scopes: options?.syntheticScopes,
|
|
}),
|
|
isWebchatConnect,
|
|
respond: (ok, payload, error) => {
|
|
if (!result) {
|
|
result = { ok, payload, error };
|
|
}
|
|
},
|
|
context,
|
|
});
|
|
|
|
if (!result) {
|
|
throw new Error(`Gateway method "${method}" completed without a response.`);
|
|
}
|
|
if (!result.ok) {
|
|
throw new Error(result.error?.message ?? `Gateway method "${method}" failed.`);
|
|
}
|
|
return result.payload as T;
|
|
}
|
|
|
|
function createGatewaySubagentRuntime(): PluginRuntime["subagent"] {
|
|
const getSessionMessages: PluginRuntime["subagent"]["getSessionMessages"] = async (params) => {
|
|
const payload = await dispatchGatewayMethod<{ messages?: unknown[] }>("sessions.get", {
|
|
key: params.sessionKey,
|
|
...(params.limit != null && { limit: params.limit }),
|
|
});
|
|
return { messages: Array.isArray(payload?.messages) ? payload.messages : [] };
|
|
};
|
|
|
|
return {
|
|
async run(params) {
|
|
const scope = getPluginRuntimeGatewayRequestScope();
|
|
const overrideRequested = Boolean(params.provider || params.model);
|
|
const hasRequestScopeClient = Boolean(scope?.client);
|
|
let allowOverride = hasRequestScopeClient && canClientUseModelOverride(scope?.client ?? null);
|
|
let allowSyntheticModelOverride = false;
|
|
if (overrideRequested && !allowOverride && !hasRequestScopeClient) {
|
|
const fallbackAuth = authorizeFallbackModelOverride({
|
|
pluginId: scope?.pluginId,
|
|
provider: params.provider,
|
|
model: params.model,
|
|
});
|
|
if (!fallbackAuth.allowed) {
|
|
throw new Error(fallbackAuth.reason);
|
|
}
|
|
allowOverride = true;
|
|
allowSyntheticModelOverride = true;
|
|
}
|
|
if (overrideRequested && !allowOverride) {
|
|
throw new Error("provider/model override is not authorized for this plugin subagent run.");
|
|
}
|
|
const payload = await dispatchGatewayMethod<{ runId?: string }>(
|
|
"agent",
|
|
{
|
|
sessionKey: params.sessionKey,
|
|
message: params.message,
|
|
deliver: params.deliver ?? false,
|
|
...(allowOverride && params.provider && { provider: params.provider }),
|
|
...(allowOverride && params.model && { model: params.model }),
|
|
...(params.extraSystemPrompt && { extraSystemPrompt: params.extraSystemPrompt }),
|
|
...(params.lane && { lane: params.lane }),
|
|
...(params.idempotencyKey && { idempotencyKey: params.idempotencyKey }),
|
|
},
|
|
{
|
|
allowSyntheticModelOverride,
|
|
},
|
|
);
|
|
const runId = payload?.runId;
|
|
if (typeof runId !== "string" || !runId) {
|
|
throw new Error("Gateway agent method returned an invalid runId.");
|
|
}
|
|
return { runId };
|
|
},
|
|
async waitForRun(params) {
|
|
const payload = await dispatchGatewayMethod<{ status?: string; error?: string }>(
|
|
"agent.wait",
|
|
{
|
|
runId: params.runId,
|
|
...(params.timeoutMs != null && { timeoutMs: params.timeoutMs }),
|
|
},
|
|
);
|
|
const status = payload?.status;
|
|
if (status !== "ok" && status !== "error" && status !== "timeout") {
|
|
throw new Error(`Gateway agent.wait returned unexpected status: ${status}`);
|
|
}
|
|
return {
|
|
status,
|
|
...(typeof payload?.error === "string" && payload.error && { error: payload.error }),
|
|
};
|
|
},
|
|
getSessionMessages,
|
|
async getSession(params) {
|
|
return getSessionMessages(params);
|
|
},
|
|
async deleteSession(params) {
|
|
await dispatchGatewayMethod(
|
|
"sessions.delete",
|
|
{
|
|
key: params.sessionKey,
|
|
deleteTranscript: params.deleteTranscript ?? true,
|
|
},
|
|
{
|
|
syntheticScopes: [ADMIN_SCOPE],
|
|
},
|
|
);
|
|
},
|
|
};
|
|
}
|
|
|
|
// ── Plugin loading ──────────────────────────────────────────────────
|
|
|
|
export function loadGatewayPlugins(params: {
|
|
cfg: ReturnType<typeof loadConfig>;
|
|
workspaceDir: string;
|
|
log: {
|
|
info: (msg: string) => void;
|
|
warn: (msg: string) => void;
|
|
error: (msg: string) => void;
|
|
debug: (msg: string) => void;
|
|
};
|
|
coreGatewayHandlers: Record<string, GatewayRequestHandler>;
|
|
baseMethods: string[];
|
|
preferSetupRuntimeForChannelPlugins?: boolean;
|
|
logDiagnostics?: boolean;
|
|
}) {
|
|
setPluginSubagentOverridePolicies(params.cfg);
|
|
// Set the process-global gateway subagent runtime BEFORE loading plugins.
|
|
// Gateway-owned registries may already exist from schema loads, so the
|
|
// gateway path opts those runtimes into late binding rather than changing
|
|
// the default subagent behavior for every plugin runtime in the process.
|
|
const gatewaySubagent = createGatewaySubagentRuntime();
|
|
setGatewaySubagentRuntime(gatewaySubagent);
|
|
|
|
const pluginRegistry = loadOpenClawPlugins({
|
|
config: params.cfg,
|
|
workspaceDir: params.workspaceDir,
|
|
logger: {
|
|
info: (msg) => params.log.info(msg),
|
|
warn: (msg) => params.log.warn(msg),
|
|
error: (msg) => params.log.error(msg),
|
|
debug: (msg) => params.log.debug(msg),
|
|
},
|
|
coreGatewayHandlers: params.coreGatewayHandlers,
|
|
runtimeOptions: {
|
|
allowGatewaySubagentBinding: true,
|
|
},
|
|
preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins,
|
|
});
|
|
const pluginMethods = Object.keys(pluginRegistry.gatewayHandlers);
|
|
const gatewayMethods = Array.from(new Set([...params.baseMethods, ...pluginMethods]));
|
|
if ((params.logDiagnostics ?? true) && pluginRegistry.diagnostics.length > 0) {
|
|
for (const diag of pluginRegistry.diagnostics) {
|
|
const details = [
|
|
diag.pluginId ? `plugin=${diag.pluginId}` : null,
|
|
diag.source ? `source=${diag.source}` : null,
|
|
]
|
|
.filter((entry): entry is string => Boolean(entry))
|
|
.join(", ");
|
|
const message = details
|
|
? `[plugins] ${diag.message} (${details})`
|
|
: `[plugins] ${diag.message}`;
|
|
if (diag.level === "error") {
|
|
params.log.error(message);
|
|
} else {
|
|
params.log.info(message);
|
|
}
|
|
}
|
|
}
|
|
return { pluginRegistry, gatewayMethods };
|
|
}
|