fix(context-engine): quarantine broken plugin engines

This commit is contained in:
Vincent Koc
2026-05-28 13:53:08 +01:00
parent 9813ff2f0a
commit 14ce8733fe
11 changed files with 864 additions and 48 deletions

View File

@@ -251,6 +251,20 @@ Native Codex and OpenClaw embedded agent runs satisfy `assemble-before-prompt`.
Generic CLI backends do not, so engines that require it are rejected before the
CLI process starts.
### Failure isolation
OpenClaw isolates the selected plugin engine from the core reply path. If a
non-legacy engine is missing, fails contract validation, throws during factory
creation, or throws from a lifecycle method, OpenClaw quarantines that engine
for the current Gateway process and downgrades context-engine work to the
built-in `legacy` engine. The error is logged with the failed operation so the
operator can repair, update, or disable the plugin without the agent going
silent.
Host requirement failures are different: when an engine declares that a runtime
lacks a required capability, OpenClaw fails closed before starting the run. That
protects engines that would corrupt state if they ran in an unsupported host.
### ownsCompaction
`ownsCompaction` controls whether OpenClaw runtime's built-in in-attempt auto-compaction stays enabled for the run:
@@ -321,7 +335,7 @@ The slot is exclusive at run time - only one registered context engine is resolv
- Use `openclaw doctor` to verify your engine is loading correctly.
- If switching engines, existing sessions continue with their current history. The new engine takes over for future runs.
- Engine errors are logged and surfaced in diagnostics. If a plugin engine fails to register or the selected engine id cannot be resolved, OpenClaw does not fall back automatically; runs fail until you fix the plugin or switch `plugins.slots.contextEngine` back to `"legacy"`.
- Engine errors are logged and the selected plugin engine is quarantined for the current Gateway process. OpenClaw falls back to `legacy` for user turns so replies can continue, but you should still repair, update, disable, or uninstall the broken plugin.
- For development, use `openclaw plugins install -l ./my-engine` to link a local plugin directory without copying.
## Related

View File

@@ -127,6 +127,28 @@ export default definePluginEntry({
docsPath: "/providers/kitchen-sink",
auth: [],
});
api.registerContextEngine("${pluginId}", () => ({
info: {
id: "${pluginId}",
name: "Kitchen Sink Context Engine",
},
async ingest() {
return { ingested: false };
},
async assemble(params) {
return {
messages: params.messages,
estimatedTokens: 0,
};
},
async compact() {
return {
ok: true,
compacted: false,
reason: "kitchen-sink fixture does not compact",
};
},
}));
api.registerChannel({
plugin: {
id: "kitchen-sink-channel",
@@ -151,6 +173,7 @@ export default definePluginEntry({
manifest: {
id: pluginId,
name: "OpenClaw Kitchen Sink",
kind: "context-engine",
channels: ["kitchen-sink-channel"],
channelConfigs: {
"kitchen-sink-channel": {

View File

@@ -373,6 +373,9 @@ function assertInstalled() {
expectIncludes(inspect.plugin?.channelIds, "kitchen-sink-channel", "channels");
expectIncludes(inspect.plugin?.providerIds, "kitchen-sink-provider", "providers");
}
if (source === "clawhub") {
expectIncludes(inspect.plugin?.contextEngineIds, pluginId, "context engines");
}
const diagnostics = [
...(list.diagnostics || []),

View File

@@ -2,7 +2,12 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import { stripAnsi } from "../terminal/ansi.js";
import { formatHealthCheckFailure } from "./health-format.js";
import type { HealthSummary } from "./health.js";
import { formatHealthChannelLines, formatModelPricingHealthLine, healthCommand } from "./health.js";
import {
formatContextEngineHealthLine,
formatHealthChannelLines,
formatModelPricingHealthLine,
healthCommand,
} from "./health.js";
const runtime = {
log: vi.fn(),
@@ -341,6 +346,31 @@ describe("healthCommand", () => {
});
});
describe("formatContextEngineHealthLine", () => {
it("summarizes quarantined context engines", () => {
const summary = createHealthSummary({
channels: {},
channelOrder: [],
channelLabels: {},
});
summary.contextEngines = {
quarantined: [
{
engineId: "lossless-claw",
owner: "plugin:lossless-claw",
operation: "assemble",
reason: "db corrupt",
failedAt: 123,
},
],
};
expect(formatContextEngineHealthLine(summary)).toBe(
"Context engine: warning (1 quarantined; downgraded to legacy: lossless-claw)",
);
});
});
describe("formatHealthCheckFailure", () => {
it("keeps non-rich output stable", () => {
const err = new Error("gateway closed (1006 abnormal closure): no close reason");

View File

@@ -13,6 +13,7 @@ import { withProgress } from "../cli/progress.js";
import { getRuntimeConfig } from "../config/config.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { listContextEngineQuarantines } from "../context-engine/registry.js";
import {
buildGatewayConnectionDetails,
callGateway,
@@ -43,6 +44,7 @@ import type {
AgentHealthSummary,
ChannelAccountHealthSummary,
ChannelHealthSummary,
ContextEngineHealthSummary,
HealthSummary,
PluginHealthErrorSummary,
PluginHealthSummary,
@@ -166,6 +168,32 @@ export function formatModelPricingHealthLine(summary: HealthSummary): string | n
return `Model pricing: warning (optional pricing refresh degraded)${detail}`;
}
function buildContextEngineHealthSummary(): ContextEngineHealthSummary | undefined {
const quarantined: ContextEngineHealthSummary["quarantined"] = [];
for (const entry of listContextEngineQuarantines()) {
const summary: ContextEngineHealthSummary["quarantined"][number] = {
engineId: entry.engineId,
operation: entry.operation,
reason: entry.reason,
failedAt: entry.failedAt.getTime(),
};
if (entry.owner) {
summary.owner = entry.owner;
}
quarantined.push(summary);
}
return quarantined.length > 0 ? { quarantined } : undefined;
}
export function formatContextEngineHealthLine(summary: HealthSummary): string | null {
const quarantined = summary.contextEngines?.quarantined ?? [];
if (quarantined.length === 0) {
return null;
}
const engines = quarantined.map((entry) => entry.engineId).join(", ");
return `Context engine: warning (${quarantined.length} quarantined; downgraded to legacy: ${engines})`;
}
const resolveHeartbeatSummary = (cfg: OpenClawConfig, agentId: string) =>
resolveHeartbeatSummaryForAgent(cfg, agentId);
@@ -571,12 +599,14 @@ export async function getHealthSnapshot(params?: {
}
const pluginHealth = buildPluginHealthSummary();
const contextEngineHealth = buildContextEngineHealthSummary();
const summary: HealthSummary = {
ok: true,
ts: Date.now(),
durationMs: Date.now() - start,
...(params?.eventLoop ? { eventLoop: params.eventLoop } : {}),
...(pluginHealth ? { plugins: pluginHealth } : {}),
...(contextEngineHealth ? { contextEngines: contextEngineHealth } : {}),
modelPricing: getGatewayModelPricingHealth({ enabled: isGatewayModelPricingEnabled(cfg) }),
channels,
channelOrder,
@@ -782,6 +812,10 @@ export async function healthCommand(
if (modelPricingLine) {
runtime.log(styleHealthChannelLine(modelPricingLine, rich));
}
const contextEngineLine = formatContextEngineHealthLine(summary);
if (contextEngineLine) {
runtime.log(styleHealthChannelLine(contextEngineLine, rich));
}
for (const plugin of displayPlugins) {
const channelSummary = summary.channels?.[plugin.id];
if (!channelSummary || channelSummary.linked !== true) {

View File

@@ -35,6 +35,18 @@ export type PluginHealthSummary = {
errors: PluginHealthErrorSummary[];
};
export type ContextEngineHealthQuarantineSummary = {
engineId: string;
owner?: string;
operation: string;
reason: string;
failedAt: number;
};
export type ContextEngineHealthSummary = {
quarantined: ContextEngineHealthQuarantineSummary[];
};
export type ModelPricingHealthSummary =
import("../gateway/model-pricing-cache-state.js").GatewayModelPricingHealth;
@@ -44,6 +56,7 @@ export type HealthSummary = {
durationMs: number;
eventLoop?: import("../gateway/server/event-loop-health.js").GatewayEventLoopHealth;
plugins?: PluginHealthSummary;
contextEngines?: ContextEngineHealthSummary;
modelPricing?: ModelPricingHealthSummary;
channels: Record<string, ChannelHealthSummary>;
channelOrder: string[];

View File

@@ -13,7 +13,9 @@ import { registerLegacyContextEngine } from "./legacy.registration.js";
import {
registerContextEngine,
registerContextEngineForOwner,
clearContextEngineRuntimeQuarantine,
getContextEngineFactory,
listContextEngineQuarantines,
listContextEngineIds,
resolveContextEngine,
resolveContextEngineOwnerPluginId,
@@ -584,6 +586,16 @@ describe("Registry tests", () => {
// ═══════════════════════════════════════════════════════════════════════════
describe("Legacy sessionKey compatibility", () => {
beforeEach(() => {
registerLegacyContextEngine();
clearContextEngineRuntimeQuarantine();
vi.spyOn(console, "error").mockImplementation(() => {});
});
afterEach(() => {
vi.restoreAllMocks();
});
it("memoizes legacy mode after the first strict compatibility retry", async () => {
const engineId = `legacy-sessionkey-${Date.now().toString(36)}`;
const strictEngine = new LegacySessionKeyStrictEngine(engineId);
@@ -655,24 +667,34 @@ describe("Legacy sessionKey compatibility", () => {
expect(strictEngine.maintainCalls[1]).not.toHaveProperty("sessionKey");
});
it("does not retry non-compat runtime errors", async () => {
it("quarantines and falls back for non-compat runtime errors", async () => {
const engineId = `sessionkey-runtime-${Date.now().toString(36)}`;
const runtimeErrorEngine = new SessionKeyRuntimeErrorEngine(engineId);
registerContextEngine(engineId, () => runtimeErrorEngine);
const engine = await resolveContextEngine(configWithSlot(engineId));
const message = makeMockMessage();
await expect(
engine.assemble({
sessionId: "s1",
sessionKey: "agent:main:test",
messages: [makeMockMessage()],
}),
).rejects.toThrow("sessionKey lookup failed");
const result = await engine.assemble({
sessionId: "s1",
sessionKey: "agent:main:test",
messages: [message],
});
const nextEngine = await resolveContextEngine(configWithSlot(engineId));
expect(result.messages).toEqual([message]);
expect(nextEngine.info.id).toBe("legacy");
expect(runtimeErrorEngine.assembleCalls).toBe(1);
expect(listContextEngineQuarantines()).toEqual([
expect.objectContaining({
engineId,
operation: "assemble",
reason: "sessionKey lookup failed",
}),
]);
});
it("does not treat 'Unknown sessionKey' runtime failures as schema-compat errors", async () => {
it("quarantines 'Unknown sessionKey' runtime failures instead of treating them as schema compat", async () => {
const engineId = `sessionkey-unknown-runtime-${Date.now().toString(36)}`;
const runtimeErrorEngine = new SessionKeyRuntimeErrorEngine(
engineId,
@@ -681,15 +703,23 @@ describe("Legacy sessionKey compatibility", () => {
registerContextEngine(engineId, () => runtimeErrorEngine);
const engine = await resolveContextEngine(configWithSlot(engineId));
const message = makeMockMessage();
await expect(
engine.assemble({
sessionId: "s1",
sessionKey: "agent:main:missing",
messages: [makeMockMessage()],
}),
).rejects.toThrow('Unknown sessionKey "agent:main:missing"');
const result = await engine.assemble({
sessionId: "s1",
sessionKey: "agent:main:missing",
messages: [message],
});
expect(result.messages).toEqual([message]);
expect(runtimeErrorEngine.assembleCalls).toBe(1);
expect(listContextEngineQuarantines()).toEqual([
expect.objectContaining({
engineId,
operation: "assemble",
reason: 'Unknown sessionKey "agent:main:missing"',
}),
]);
});
});
@@ -841,6 +871,7 @@ describe("Factory context passing", () => {
describe("Invalid engine fallback", () => {
beforeEach(() => {
registerLegacyContextEngine();
clearContextEngineRuntimeQuarantine();
vi.spyOn(console, "error").mockImplementation(() => {});
});
@@ -855,7 +886,7 @@ describe("Invalid engine fallback", () => {
engineId: uniqueEngineId("does-not-exist"),
register: () => undefined,
expectedError: (engineId: string) =>
`[context-engine] Context engine "${engineId}" is not registered; falling back to default engine "legacy".`,
`[context-engine] Context engine "${engineId}" failed during resolve: not registered; quarantining it for this process and falling back to default engine "legacy".`,
},
{
name: "factory throws",
@@ -866,7 +897,7 @@ describe("Invalid engine fallback", () => {
});
},
expectedError: (engineId: string) =>
`[context-engine] Context engine "${engineId}" factory threw during resolution: plugin version mismatch; falling back to default engine "legacy".`,
`[context-engine] Context engine "${engineId}" owner=public-sdk failed during factory: plugin version mismatch; quarantining it for this process and falling back to default engine "legacy".`,
},
{
name: "missing info metadata",
@@ -889,7 +920,7 @@ describe("Invalid engine fallback", () => {
);
},
expectedError: (engineId: string) =>
`[context-engine] Context engine "${engineId}" factory returned an invalid ContextEngine: missing info.; falling back to default engine "legacy".`,
`[context-engine] Context engine "${engineId}" owner=public-sdk failed during contract-validation: Context engine "${engineId}" factory returned an invalid ContextEngine: missing info.; quarantining it for this process and falling back to default engine "legacy".`,
},
{
name: "missing lifecycle methods",
@@ -907,7 +938,7 @@ describe("Invalid engine fallback", () => {
);
},
expectedError: (engineId: string) =>
`[context-engine] Context engine "${engineId}" factory returned an invalid ContextEngine: missing assemble(), missing compact().; falling back to default engine "legacy".`,
`[context-engine] Context engine "${engineId}" owner=public-sdk failed during contract-validation: Context engine "${engineId}" factory returned an invalid ContextEngine: missing assemble(), missing compact().; quarantining it for this process and falling back to default engine "legacy".`,
},
{
name: "contract validation throws",
@@ -916,7 +947,7 @@ describe("Invalid engine fallback", () => {
registerContextEngine(engineId, () => 42n as unknown as ContextEngine);
},
expectedError: (engineId: string) =>
`[context-engine] Context engine "${engineId}" contract validation threw: Do not know how to serialize a BigInt; falling back to default engine "legacy".`,
`[context-engine] Context engine "${engineId}" owner=public-sdk failed during contract-validation: Do not know how to serialize a BigInt; quarantining it for this process and falling back to default engine "legacy".`,
},
] as const;
@@ -930,9 +961,163 @@ describe("Invalid engine fallback", () => {
expect(console.error, testCase.name).toHaveBeenCalledWith(
testCase.expectedError(testCase.engineId),
);
expect(
listContextEngineQuarantines().some((entry) => entry.engineId === testCase.engineId),
).toBe(true);
}
});
it("quarantines a selected engine after lifecycle failure and resolves legacy next time", async () => {
const engineId = uniqueEngineId("runtime-fail");
const assemble = vi.fn(async () => {
throw new Error("lcm db is corrupt");
});
let factoryCalls = 0;
registerContextEngine(engineId, () => {
factoryCalls += 1;
return {
info: { id: "lcm", name: "Lossless Context Manager" },
async ingest() {
return { ingested: true };
},
assemble,
async compact() {
return { ok: true, compacted: false };
},
};
});
const engine = await resolveContextEngine(configWithSlot(engineId));
const message = makeMockMessage("user", "hello");
const result = await engine.assemble({
sessionId: "s1",
messages: [message],
});
const nextEngine = await resolveContextEngine(configWithSlot(engineId));
expect(result.messages).toEqual([message]);
expect(nextEngine.info.id).toBe("legacy");
expect(factoryCalls).toBe(1);
expect(assemble).toHaveBeenCalledTimes(1);
expect(listContextEngineQuarantines()).toEqual([
expect.objectContaining({
engineId,
owner: "public-sdk",
operation: "assemble",
reason: "lcm db is corrupt",
}),
]);
expect(console.error).toHaveBeenCalledWith(
`[context-engine] Context engine "${engineId}" owner=public-sdk failed during assemble: lcm db is corrupt; quarantining it for this process and falling back to default engine "legacy".`,
);
});
it("clears a missing-engine quarantine when the plugin registers later", async () => {
const engineId = uniqueEngineId("late-register");
const missingEngine = await resolveContextEngine(configWithSlot(engineId));
expect(missingEngine.info.id).toBe("legacy");
expect(listContextEngineQuarantines()).toEqual([
expect.objectContaining({
engineId,
operation: "resolve",
reason: "not registered",
}),
]);
registerContextEngine(engineId, () => ({
info: { id: engineId, name: "Late Registered Engine" },
async ingest() {
return { ingested: true };
},
async assemble({ messages }: { messages: AgentMessage[] }) {
return { messages, estimatedTokens: 0 };
},
async compact() {
return { ok: true, compacted: false };
},
}));
const registeredEngine = await resolveContextEngine(configWithSlot(engineId));
expect(listContextEngineQuarantines()).toEqual([]);
expect(registeredEngine.info.id).toBe(engineId);
});
it("does not quarantine abort rejections from lifecycle methods", async () => {
const engineId = uniqueEngineId("abort-rejection");
const abortError = new Error("compaction aborted");
abortError.name = "AbortError";
const controller = new AbortController();
registerContextEngine(engineId, () => ({
info: { id: engineId, name: "Abort Aware Engine" },
async ingest() {
return { ingested: true };
},
async assemble({ messages }: { messages: AgentMessage[] }) {
return { messages, estimatedTokens: 0 };
},
async compact() {
controller.abort(new Error("user stopped run"));
throw abortError;
},
}));
const engine = await resolveContextEngine(configWithSlot(engineId));
await expect(
engine.compact({
sessionId: "s1",
sessionFile: "/tmp/session.json",
abortSignal: controller.signal,
}),
).rejects.toThrow("compaction aborted");
const nextEngine = await resolveContextEngine(configWithSlot(engineId));
expect(nextEngine.info.id).toBe(engineId);
expect(listContextEngineQuarantines()).toEqual([]);
expect(console.error).not.toHaveBeenCalled();
});
it("quarantines subagent preparation failures while failing the active spawn closed", async () => {
const engineId = uniqueEngineId("prepare-subagent-fail");
registerContextEngine(engineId, () => ({
info: { id: engineId, name: "Spawn Aware Engine" },
async ingest() {
return { ingested: true };
},
async assemble({ messages }: { messages: AgentMessage[] }) {
return { messages, estimatedTokens: 0 };
},
async compact() {
return { ok: true, compacted: false };
},
async prepareSubagentSpawn() {
throw new Error("child context projection failed");
},
}));
const engine = await resolveContextEngine(configWithSlot(engineId));
await expect(
engine.prepareSubagentSpawn?.({
parentSessionKey: "agent:main",
childSessionKey: "agent:child",
contextMode: "isolated",
}),
).rejects.toThrow("child context projection failed");
const nextEngine = await resolveContextEngine(configWithSlot(engineId));
expect(nextEngine.info.id).toBe("legacy");
expect(listContextEngineQuarantines()).toEqual([
expect.objectContaining({
engineId,
operation: "prepareSubagentSpawn",
reason: "child context projection failed",
}),
]);
});
it("throws when the default engine itself is not registered", async () => {
// Access the process-global registry via the well-known symbol and clear it
// so even the default engine is missing. The symbol key must match the

View File

@@ -2,7 +2,16 @@ import type { OpenClawConfig } from "../config/types.js";
import { defaultSlotIdForKey } from "../plugins/slots.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { sanitizeForLog } from "../terminal/ansi.js";
import type { ContextEngine } from "./types.js";
import type {
AssembleResult,
BootstrapResult,
CompactResult,
ContextEngine,
ContextEngineMaintenanceResult,
IngestBatchResult,
IngestResult,
SubagentSpawnPreparation,
} from "./types.js";
/**
* Runtime context passed to context engine factories during resolution.
@@ -320,9 +329,26 @@ function wrapContextEngineWithSessionKeyCompat(engine: ContextEngine): ContextEn
function wrapResolvedContextEngine(
engine: ContextEngine,
metadata: { owner: string },
metadata: {
owner: string;
engineId: string;
defaultEngineId?: string;
factoryCtx?: ContextEngineFactoryContext;
},
): ContextEngine {
const wrapped = wrapContextEngineWithSessionKeyCompat(engine);
const compatWrapped = wrapContextEngineWithSessionKeyCompat(engine);
const wrapped =
metadata.defaultEngineId &&
metadata.factoryCtx &&
metadata.engineId !== metadata.defaultEngineId
? wrapContextEngineWithRuntimeQuarantine({
engine: compatWrapped,
engineId: metadata.engineId,
owner: metadata.owner,
defaultEngineId: metadata.defaultEngineId,
factoryCtx: metadata.factoryCtx,
})
: compatWrapped;
RESOLVED_CONTEXT_ENGINE_METADATA.set(wrapped, metadata);
return wrapped;
}
@@ -335,6 +361,14 @@ const CONTEXT_ENGINE_REGISTRY_STATE = Symbol.for("openclaw.contextEngineRegistry
const CORE_CONTEXT_ENGINE_OWNER = "core";
const PUBLIC_CONTEXT_ENGINE_OWNER = "public-sdk";
export type ContextEngineRuntimeQuarantine = {
engineId: string;
owner?: string;
operation: string;
reason: string;
failedAt: Date;
};
type ContextEngineRegistryState = {
engines: Map<
string,
@@ -343,6 +377,7 @@ type ContextEngineRegistryState = {
owner: string;
}
>;
quarantinedEngines: Map<string, ContextEngineRuntimeQuarantine>;
};
// Keep context-engine registrations process-global so duplicated dist chunks
@@ -351,6 +386,7 @@ const contextEngineRegistryState = resolveGlobalSingleton<ContextEngineRegistryS
CONTEXT_ENGINE_REGISTRY_STATE,
() => ({
engines: new Map(),
quarantinedEngines: new Map(),
}),
);
@@ -368,6 +404,69 @@ function requireContextEngineOwner(owner: string): string {
return normalizedOwner;
}
function formatContextEngineError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function recordContextEngineQuarantine(params: {
engineId: string;
owner?: string;
operation: string;
error: unknown;
defaultEngineId: string;
}): ContextEngineRuntimeQuarantine {
const registryState = getContextEngineRegistryState();
const existing = registryState.quarantinedEngines.get(params.engineId);
if (existing) {
return existing;
}
const quarantine: ContextEngineRuntimeQuarantine = {
engineId: params.engineId,
operation: params.operation,
reason: formatContextEngineError(params.error),
failedAt: new Date(),
...(params.owner ? { owner: params.owner } : {}),
};
registryState.quarantinedEngines.set(params.engineId, quarantine);
const ownerSuffix = params.owner ? ` owner=${sanitizeForLog(params.owner)}` : "";
console.error(
`[context-engine] Context engine "${sanitizeForLog(params.engineId)}"${ownerSuffix} failed during ${sanitizeForLog(params.operation)}: ` +
`${sanitizeForLog(quarantine.reason)}; quarantining it for this process and falling back to default engine "${params.defaultEngineId}".`,
);
return quarantine;
}
function getContextEngineQuarantine(engineId: string): ContextEngineRuntimeQuarantine | undefined {
return getContextEngineRegistryState().quarantinedEngines.get(engineId);
}
export function listContextEngineQuarantines(): ContextEngineRuntimeQuarantine[] {
const quarantines: ContextEngineRuntimeQuarantine[] = [];
for (const entry of getContextEngineRegistryState().quarantinedEngines.values()) {
const quarantine: ContextEngineRuntimeQuarantine = {
engineId: entry.engineId,
operation: entry.operation,
reason: entry.reason,
failedAt: new Date(entry.failedAt),
};
if (entry.owner) {
quarantine.owner = entry.owner;
}
quarantines.push(quarantine);
}
return quarantines;
}
export function clearContextEngineRuntimeQuarantine(engineId?: string): void {
const quarantinedEngines = getContextEngineRegistryState().quarantinedEngines;
if (engineId === undefined) {
quarantinedEngines.clear();
return;
}
quarantinedEngines.delete(engineId);
}
/**
* Register a context engine implementation under an explicit trusted owner.
*/
@@ -393,6 +492,7 @@ export function registerContextEngineForOwner(
return { ok: false, existingOwner: existing.owner };
}
registry.set(id, { factory, owner: normalizedOwner });
clearContextEngineRuntimeQuarantine(id);
return { ok: true };
}
@@ -426,10 +526,12 @@ export function listContextEngineIds(): string[] {
export function clearContextEnginesForOwner(owner: string): void {
const normalizedOwner = requireContextEngineOwner(owner);
const registry = getContextEngineRegistryState().engines;
const registryState = getContextEngineRegistryState();
const registry = registryState.engines;
for (const [id, entry] of registry.entries()) {
if (entry.owner === normalizedOwner) {
registry.delete(id);
registryState.quarantinedEngines.delete(id);
}
}
}
@@ -496,6 +598,206 @@ function describeResolvedContextEngineContractError(
return `Context engine "${engineId}" factory returned an invalid ContextEngine: ${issues.join(", ")}.`;
}
type GuardedContextEngineMethodName =
| "bootstrap"
| "maintain"
| "ingest"
| "ingestBatch"
| "afterTurn"
| "assemble"
| "compact"
| "prepareSubagentSpawn"
| "onSubagentEnded";
const GUARDED_CONTEXT_ENGINE_METHODS = new Set<PropertyKey>([
"bootstrap",
"maintain",
"ingest",
"ingestBatch",
"afterTurn",
"assemble",
"compact",
"prepareSubagentSpawn",
"onSubagentEnded",
] satisfies GuardedContextEngineMethodName[]);
function contextEngineFallbackResult(
methodName: GuardedContextEngineMethodName,
): BootstrapResult | ContextEngineMaintenanceResult | IngestResult | IngestBatchResult | void {
switch (methodName) {
case "bootstrap":
return {
bootstrapped: false,
reason: "context engine downgraded to legacy",
};
case "maintain":
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
reason: "context engine downgraded to legacy",
};
case "ingest":
return { ingested: false };
case "ingestBatch":
return { ingestedCount: 0 };
case "afterTurn":
case "prepareSubagentSpawn":
case "onSubagentEnded":
return undefined;
case "assemble":
case "compact":
throw new Error(`No legacy fallback result for ${methodName}`);
}
}
function contextEngineAbortSignal(methodParams: unknown): AbortSignal | undefined {
if (!methodParams || typeof methodParams !== "object") {
return undefined;
}
const signal = (methodParams as { abortSignal?: unknown }).abortSignal;
if (signal && typeof signal === "object" && "aborted" in signal) {
return signal as AbortSignal;
}
return undefined;
}
function contextEngineAbortError(methodParams: unknown): Error | undefined {
const signal = contextEngineAbortSignal(methodParams);
if (!signal?.aborted) {
return undefined;
}
const reason = signal.reason;
if (reason instanceof Error) {
return reason;
}
const error = new Error(
typeof reason === "string" && reason ? reason : "Context engine operation aborted.",
);
error.name = "AbortError";
return error;
}
function isContextEngineAbortRejection(error: unknown, methodParams: unknown): boolean {
const signal = contextEngineAbortSignal(methodParams);
if (!signal?.aborted) {
return false;
}
if (error === signal.reason) {
return true;
}
if (error instanceof Error) {
const message = error.message.toLowerCase();
return (
error.name === "AbortError" ||
message.includes("abort") ||
message.includes("cancelled") ||
message.includes("canceled")
);
}
return typeof error === "string" && /abort|cancelled|canceled/iu.test(error);
}
async function invokeFallbackContextEngineMethod(params: {
getFallbackEngine: () => Promise<ContextEngine>;
methodName: GuardedContextEngineMethodName;
methodParams: unknown;
}): Promise<
| AssembleResult
| BootstrapResult
| CompactResult
| ContextEngineMaintenanceResult
| IngestBatchResult
| IngestResult
| SubagentSpawnPreparation
| void
> {
const fallbackEngine = await params.getFallbackEngine();
const fallbackMethod = fallbackEngine[params.methodName] as
| ((methodParams: unknown) => unknown)
| undefined;
if (typeof fallbackMethod === "function") {
return (await fallbackMethod.call(fallbackEngine, params.methodParams)) as
| AssembleResult
| BootstrapResult
| CompactResult
| ContextEngineMaintenanceResult
| IngestBatchResult
| IngestResult
| SubagentSpawnPreparation
| void;
}
return contextEngineFallbackResult(params.methodName);
}
function wrapContextEngineWithRuntimeQuarantine(params: {
engine: ContextEngine;
engineId: string;
owner: string;
defaultEngineId: string;
factoryCtx: ContextEngineFactoryContext;
}): ContextEngine {
let fallbackEnginePromise: Promise<ContextEngine> | undefined;
const getFallbackEngine = () => {
fallbackEnginePromise ??= resolveDefaultContextEngine(
params.defaultEngineId,
params.factoryCtx,
);
return fallbackEnginePromise;
};
return new Proxy(params.engine, {
get(target, property, receiver) {
const value = Reflect.get(target, property, receiver);
if (typeof value !== "function" || !GUARDED_CONTEXT_ENGINE_METHODS.has(property)) {
return typeof value === "function" ? value.bind(target) : value;
}
const methodName = property as GuardedContextEngineMethodName;
return async (methodParams: unknown) => {
const aborted = contextEngineAbortError(methodParams);
if (aborted) {
throw aborted;
}
if (getContextEngineQuarantine(params.engineId)) {
return await invokeFallbackContextEngineMethod({
getFallbackEngine,
methodName,
methodParams,
});
}
try {
return await (value as (methodParams: unknown) => unknown).call(target, methodParams);
} catch (error) {
if (isContextEngineAbortRejection(error, methodParams)) {
throw error;
}
recordContextEngineQuarantine({
engineId: params.engineId,
owner: params.owner,
operation: methodName,
error,
defaultEngineId: params.defaultEngineId,
});
if (methodName === "prepareSubagentSpawn") {
throw error;
}
try {
return await invokeFallbackContextEngineMethod({
getFallbackEngine,
methodName,
methodParams,
});
} catch {
throw error;
}
}
};
},
});
}
// ---------------------------------------------------------------------------
// Resolution
// ---------------------------------------------------------------------------
@@ -543,6 +845,11 @@ export async function resolveContextEngine(
workspaceDir: options?.workspaceDir,
};
const quarantine = !isDefaultEngine ? getContextEngineQuarantine(engineId) : undefined;
if (quarantine) {
return resolveDefaultContextEngine(defaultEngineId, factoryCtx);
}
const entry = getContextEngineRegistryState().engines.get(engineId);
if (!entry) {
if (isDefaultEngine) {
@@ -551,10 +858,12 @@ export async function resolveContextEngine(
`Available engines: ${listContextEngineIds().join(", ") || "(none)"}`,
);
}
console.error(
`[context-engine] Context engine "${sanitizeForLog(engineId)}" is not registered; ` +
`falling back to default engine "${defaultEngineId}".`,
);
recordContextEngineQuarantine({
engineId,
operation: "resolve",
error: "not registered",
defaultEngineId,
});
return resolveDefaultContextEngine(defaultEngineId, factoryCtx);
}
@@ -565,11 +874,13 @@ export async function resolveContextEngine(
if (isDefaultEngine) {
throw factoryError;
}
console.error(
`[context-engine] Context engine "${sanitizeForLog(engineId)}" factory threw during resolution: ` +
`${sanitizeForLog(factoryError instanceof Error ? factoryError.message : String(factoryError))}; ` +
`falling back to default engine "${defaultEngineId}".`,
);
recordContextEngineQuarantine({
engineId,
owner: entry.owner,
operation: "factory",
error: factoryError,
defaultEngineId,
});
return resolveDefaultContextEngine(defaultEngineId, factoryCtx);
}
@@ -580,25 +891,35 @@ export async function resolveContextEngine(
if (isDefaultEngine) {
throw validationError;
}
console.error(
`[context-engine] Context engine "${sanitizeForLog(engineId)}" contract validation threw: ` +
`${sanitizeForLog(validationError instanceof Error ? validationError.message : String(validationError))}; ` +
`falling back to default engine "${defaultEngineId}".`,
);
recordContextEngineQuarantine({
engineId,
owner: entry.owner,
operation: "contract-validation",
error: validationError,
defaultEngineId,
});
return resolveDefaultContextEngine(defaultEngineId, factoryCtx);
}
if (contractError) {
if (isDefaultEngine) {
throw new Error(contractError);
}
// contractError includes engineId from plugin config; sanitizeForLog covers it
console.error(
`[context-engine] ${sanitizeForLog(contractError)}; falling back to default engine "${defaultEngineId}".`,
);
recordContextEngineQuarantine({
engineId,
owner: entry.owner,
operation: "contract-validation",
error: contractError,
defaultEngineId,
});
return resolveDefaultContextEngine(defaultEngineId, factoryCtx);
}
return wrapResolvedContextEngine(engine, { owner: entry.owner });
return wrapResolvedContextEngine(engine, {
owner: entry.owner,
engineId,
defaultEngineId,
factoryCtx,
});
}
/**
@@ -623,5 +944,8 @@ async function resolveDefaultContextEngine(
if (contractError) {
throw new Error(`[context-engine] ${contractError}`);
}
return wrapResolvedContextEngine(engine, { owner: defaultEntry.owner });
return wrapResolvedContextEngine(engine, {
owner: defaultEntry.owner,
engineId: defaultEngineId,
});
}

View File

@@ -2,6 +2,7 @@ import { ErrorCodes, errorShape } from "../../../packages/gateway-protocol/src/i
import type { ChannelAccountSnapshot } from "../../channels/plugins/types.public.js";
import type { ChannelHealthSummary, HealthSummary } from "../../commands/health.types.js";
import { getStatusSummary } from "../../commands/status.js";
import { listContextEngineQuarantines } from "../../context-engine/registry.js";
import { getGatewayModelPricingHealth } from "../model-pricing-cache-state.js";
import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js";
import { HEALTH_REFRESH_INTERVAL_MS } from "../server-constants.js";
@@ -87,9 +88,26 @@ function mergeCachedHealthRuntimeState(params: {
cached: HealthSummary;
eventLoop?: HealthSummary["eventLoop"];
}): HealthSummary {
const { contextEngines: _cachedContextEngines, ...cached } = params.cached;
const quarantinedContextEngines: NonNullable<HealthSummary["contextEngines"]>["quarantined"] = [];
for (const entry of listContextEngineQuarantines()) {
const summary: NonNullable<HealthSummary["contextEngines"]>["quarantined"][number] = {
engineId: entry.engineId,
operation: entry.operation,
reason: entry.reason,
failedAt: entry.failedAt.getTime(),
};
if (entry.owner) {
summary.owner = entry.owner;
}
quarantinedContextEngines.push(summary);
}
return {
...params.cached,
...cached,
...(params.eventLoop ? { eventLoop: params.eventLoop } : {}),
...(quarantinedContextEngines.length > 0
? { contextEngines: { quarantined: quarantinedContextEngines } }
: {}),
modelPricing: getGatewayModelPricingHealth({
enabled: params.cached.modelPricing?.state !== "disabled",
}),

View File

@@ -7,6 +7,13 @@ import { fileURLToPath } from "node:url";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { validateExecApprovalRequestParams } from "../../../packages/gateway-protocol/src/index.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { registerLegacyContextEngine } from "../../context-engine/legacy.registration.js";
import {
clearContextEngineRuntimeQuarantine,
clearContextEnginesForOwner,
registerContextEngineForOwner,
resolveContextEngine,
} from "../../context-engine/registry.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { formatZonedTimestamp } from "../../infra/format-time/format-datetime.js";
import {
@@ -3037,6 +3044,7 @@ describe("gateway healthHandlers.status scope handling", () => {
describe("gateway healthHandlers.health cache freshness", () => {
let healthHandlers: typeof import("./health.js").healthHandlers;
let pricingState: typeof import("../model-pricing-cache-state.js");
const contextEngineTestOwner = "plugin:health-test";
beforeAll(async () => {
({ healthHandlers } = await import("./health.js"));
@@ -3045,10 +3053,15 @@ describe("gateway healthHandlers.health cache freshness", () => {
beforeEach(() => {
pricingState.clearGatewayModelPricingCacheState();
registerLegacyContextEngine();
clearContextEnginesForOwner(contextEngineTestOwner);
clearContextEngineRuntimeQuarantine();
});
afterEach(() => {
pricingState.clearGatewayModelPricingCacheState();
clearContextEnginesForOwner(contextEngineTestOwner);
clearContextEngineRuntimeQuarantine();
});
it("refreshes cached health when runtime channel lifecycle has changed", async () => {
@@ -3252,6 +3265,83 @@ describe("gateway healthHandlers.health cache freshness", () => {
});
});
it("merges live context-engine quarantine state into cached health responses", async () => {
const engineId = `health-context-engine-${Date.now()}`;
const consoleError = vi.spyOn(console, "error").mockImplementation(() => {});
registerContextEngineForOwner(
engineId,
() => ({
info: { id: "lcm", name: "Lossless Claw Memory" },
ingest: async () => ({ ingested: false }),
assemble: async () => {
throw new Error("lcm transcript store is corrupt");
},
compact: async () => ({ ok: true, compacted: false }),
}),
contextEngineTestOwner,
);
try {
const contextEngine = await resolveContextEngine({
plugins: { slots: { contextEngine: engineId } },
} as OpenClawConfig);
await contextEngine.assemble({ sessionId: "s1", messages: [] });
const cached = {
ok: true,
ts: Date.now(),
durationMs: 1,
channels: {},
channelOrder: [],
channelLabels: {},
heartbeatSeconds: 0,
defaultAgentId: "main",
agents: [],
sessions: { path: "/tmp/sessions.json", count: 0, recent: [] },
};
const respond = vi.fn();
const refreshHealthSnapshot = vi.fn().mockResolvedValue(cached);
await healthHandlers.health({
req: {} as never,
params: {} as never,
respond: respond as never,
context: {
getHealthCache: () => cached,
refreshHealthSnapshot,
getRuntimeSnapshot: () => ({ channels: {}, channelAccounts: {} }),
logHealth: { error: vi.fn() },
} as never,
client: { connect: { role: "operator", scopes: ["operator.read"] } } as never,
isWebchatConnect: () => false,
});
const payload = mockCallArg(respond, 0, 1) as
| {
contextEngines?: {
quarantined?: Array<{
engineId?: string;
owner?: string;
operation?: string;
reason?: string;
failedAt?: number;
}>;
};
}
| undefined;
expect(payload?.contextEngines?.quarantined).toHaveLength(1);
expect(payload?.contextEngines?.quarantined?.[0]).toMatchObject({
engineId,
owner: contextEngineTestOwner,
operation: "assemble",
reason: "lcm transcript store is corrupt",
});
expect(typeof payload?.contextEngines?.quarantined?.[0]?.failedAt).toBe("number");
expect(mockCallArg(respond, 0, 3)).toEqual({ cached: true });
} finally {
consoleError.mockRestore();
}
});
it("refreshes cached health when a runtime account is missing from the cached account summary", async () => {
const cached = {
ok: true,

View File

@@ -27,6 +27,7 @@ function fullSurfaceInspectPayload(pluginId: string) {
id: pluginId,
enabled: true,
status: "loaded",
contextEngineIds: [pluginId],
channelIds: ["kitchen-sink-channel"],
providerIds: ["kitchen-sink-provider"],
speechProviderIds: ["kitchen-sink-speech"],
@@ -115,6 +116,72 @@ function runAssertInstalled({
}
}
function runAssertClawhubInstalled({
contextEngineIds = [],
}: {
contextEngineIds?: string[];
} = {}) {
const label = `clawhub-context-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const pluginId = "openclaw-kitchen-sink-fixture";
const home = mkdtempSync(path.join(tmpdir(), "openclaw-kitchen-sink-home-"));
const installPath = mkdtempSync(path.join(tmpdir(), "openclaw-kitchen-sink-install-"));
const scratchRoot = tmpdir();
const pluginsJsonPath = path.join(scratchRoot, `kitchen-sink-${label}-plugins.json`);
const inspectJsonPath = path.join(scratchRoot, `kitchen-sink-${label}-inspect.json`);
const inspectAllJsonPath = path.join(scratchRoot, `kitchen-sink-${label}-inspect-all.json`);
const installPathMarker = path.join(scratchRoot, `kitchen-sink-${label}-install-path.txt`);
const installsPath = path.join(home, ".openclaw", "plugins", "installs.json");
try {
const inspectPayload = fullSurfaceInspectPayload(pluginId);
inspectPayload.plugin.contextEngineIds = contextEngineIds;
writeJson(pluginsJsonPath, {
diagnostics: [],
plugins: [{ id: pluginId, status: "loaded" }],
});
writeJson(inspectJsonPath, inspectPayload);
writeJson(inspectAllJsonPath, { diagnostics: [] });
writeJson(installsPath, {
installRecords: {
[pluginId]: {
artifactFormat: "zip",
artifactKind: "legacy-zip",
clawhubFamily: "code-plugin",
clawhubPackage: "@openclaw/kitchen-sink",
integrity: "sha256-test",
installPath,
resolvedSpec: "clawhub:@openclaw/kitchen-sink@latest",
resolvedVersion: "1.0.0",
resolvedAt: 1,
source: "clawhub",
spec: "clawhub:@openclaw/kitchen-sink@latest",
version: "1.0.0",
},
},
});
return spawnSync(process.execPath, [ASSERTIONS_SCRIPT, "assert-installed"], {
encoding: "utf8",
env: {
...process.env,
HOME: home,
KITCHEN_SINK_ID: pluginId,
KITCHEN_SINK_LABEL: label,
KITCHEN_SINK_SOURCE: "clawhub",
KITCHEN_SINK_SPEC: "clawhub:@openclaw/kitchen-sink@latest",
KITCHEN_SINK_SURFACE_MODE: "basic",
KITCHEN_SINK_TMP_DIR: scratchRoot,
},
});
} finally {
rmSync(home, { force: true, recursive: true });
rmSync(installPath, { force: true, recursive: true });
rmSync(pluginsJsonPath, { force: true });
rmSync(inspectJsonPath, { force: true });
rmSync(inspectAllJsonPath, { force: true });
rmSync(installPathMarker, { force: true });
}
}
function runScanLogs({ home, scratchRoot }: { home: string; scratchRoot: string }) {
return spawnSync(process.execPath, [ASSERTIONS_SCRIPT, "scan-logs"], {
encoding: "utf8",
@@ -144,6 +211,21 @@ describe("kitchen-sink plugin assertions", () => {
expect(result.status).toBe(0);
});
it("requires ClawHub kitchen-sink fixtures to expose context engines", () => {
const result = runAssertClawhubInstalled({ contextEngineIds: [] });
expect(result.status).not.toBe(0);
expect(`${result.stdout}\n${result.stderr}`).toContain("context engines missing");
});
it("accepts ClawHub kitchen-sink fixtures with a context engine", () => {
const result = runAssertClawhubInstalled({
contextEngineIds: ["openclaw-kitchen-sink-fixture"],
});
expect(result.status).toBe(0);
});
it("keeps exhaustive diagnostic matching available for synchronized fixtures", () => {
const result = runAssertInstalled({
diagnostics: diagnosticErrors(REQUIRED_FULL_DIAGNOSTIC_CANARIES),