diff --git a/docs/concepts/context-engine.md b/docs/concepts/context-engine.md index 9c95165330e..f251db9cef7 100644 --- a/docs/concepts/context-engine.md +++ b/docs/concepts/context-engine.md @@ -251,6 +251,20 @@ Native Codex and OpenClaw embedded agent runs satisfy `assemble-before-prompt`. Generic CLI backends do not, so engines that require it are rejected before the CLI process starts. +### Failure isolation + +OpenClaw isolates the selected plugin engine from the core reply path. If a +non-legacy engine is missing, fails contract validation, throws during factory +creation, or throws from a lifecycle method, OpenClaw quarantines that engine +for the current Gateway process and downgrades context-engine work to the +built-in `legacy` engine. The error is logged with the failed operation so the +operator can repair, update, or disable the plugin without the agent going +silent. + +Host requirement failures are different: when an engine declares that a runtime +lacks a required capability, OpenClaw fails closed before starting the run. That +protects engines that would corrupt state if they ran in an unsupported host. + ### ownsCompaction `ownsCompaction` controls whether OpenClaw runtime's built-in in-attempt auto-compaction stays enabled for the run: @@ -321,7 +335,7 @@ The slot is exclusive at run time - only one registered context engine is resolv - Use `openclaw doctor` to verify your engine is loading correctly. - If switching engines, existing sessions continue with their current history. The new engine takes over for future runs. -- Engine errors are logged and surfaced in diagnostics. If a plugin engine fails to register or the selected engine id cannot be resolved, OpenClaw does not fall back automatically; runs fail until you fix the plugin or switch `plugins.slots.contextEngine` back to `"legacy"`. +- Engine errors are logged and the selected plugin engine is quarantined for the current Gateway process. OpenClaw falls back to `legacy` for user turns so replies can continue, but you should still repair, update, disable, or uninstall the broken plugin. - For development, use `openclaw plugins install -l ./my-engine` to link a local plugin directory without copying. ## Related diff --git a/scripts/e2e/lib/clawhub-fixture-server.cjs b/scripts/e2e/lib/clawhub-fixture-server.cjs index 19304540418..957e9544f94 100644 --- a/scripts/e2e/lib/clawhub-fixture-server.cjs +++ b/scripts/e2e/lib/clawhub-fixture-server.cjs @@ -127,6 +127,28 @@ export default definePluginEntry({ docsPath: "/providers/kitchen-sink", auth: [], }); + api.registerContextEngine("${pluginId}", () => ({ + info: { + id: "${pluginId}", + name: "Kitchen Sink Context Engine", + }, + async ingest() { + return { ingested: false }; + }, + async assemble(params) { + return { + messages: params.messages, + estimatedTokens: 0, + }; + }, + async compact() { + return { + ok: true, + compacted: false, + reason: "kitchen-sink fixture does not compact", + }; + }, + })); api.registerChannel({ plugin: { id: "kitchen-sink-channel", @@ -151,6 +173,7 @@ export default definePluginEntry({ manifest: { id: pluginId, name: "OpenClaw Kitchen Sink", + kind: "context-engine", channels: ["kitchen-sink-channel"], channelConfigs: { "kitchen-sink-channel": { diff --git a/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs b/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs index 6bdbb4eeb48..9a03e647181 100644 --- a/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs +++ b/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs @@ -373,6 +373,9 @@ function assertInstalled() { expectIncludes(inspect.plugin?.channelIds, "kitchen-sink-channel", "channels"); expectIncludes(inspect.plugin?.providerIds, "kitchen-sink-provider", "providers"); } + if (source === "clawhub") { + expectIncludes(inspect.plugin?.contextEngineIds, pluginId, "context engines"); + } const diagnostics = [ ...(list.diagnostics || []), diff --git a/src/commands/health.test.ts b/src/commands/health.test.ts index 8ba7f56d9a4..63412feb93f 100644 --- a/src/commands/health.test.ts +++ b/src/commands/health.test.ts @@ -2,7 +2,12 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { stripAnsi } from "../terminal/ansi.js"; import { formatHealthCheckFailure } from "./health-format.js"; import type { HealthSummary } from "./health.js"; -import { formatHealthChannelLines, formatModelPricingHealthLine, healthCommand } from "./health.js"; +import { + formatContextEngineHealthLine, + formatHealthChannelLines, + formatModelPricingHealthLine, + healthCommand, +} from "./health.js"; const runtime = { log: vi.fn(), @@ -341,6 +346,31 @@ describe("healthCommand", () => { }); }); +describe("formatContextEngineHealthLine", () => { + it("summarizes quarantined context engines", () => { + const summary = createHealthSummary({ + channels: {}, + channelOrder: [], + channelLabels: {}, + }); + summary.contextEngines = { + quarantined: [ + { + engineId: "lossless-claw", + owner: "plugin:lossless-claw", + operation: "assemble", + reason: "db corrupt", + failedAt: 123, + }, + ], + }; + + expect(formatContextEngineHealthLine(summary)).toBe( + "Context engine: warning (1 quarantined; downgraded to legacy: lossless-claw)", + ); + }); +}); + describe("formatHealthCheckFailure", () => { it("keeps non-rich output stable", () => { const err = new Error("gateway closed (1006 abnormal closure): no close reason"); diff --git a/src/commands/health.ts b/src/commands/health.ts index 6d685c440e1..caaae71ad41 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -13,6 +13,7 @@ import { withProgress } from "../cli/progress.js"; import { getRuntimeConfig } from "../config/config.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { listContextEngineQuarantines } from "../context-engine/registry.js"; import { buildGatewayConnectionDetails, callGateway, @@ -43,6 +44,7 @@ import type { AgentHealthSummary, ChannelAccountHealthSummary, ChannelHealthSummary, + ContextEngineHealthSummary, HealthSummary, PluginHealthErrorSummary, PluginHealthSummary, @@ -166,6 +168,32 @@ export function formatModelPricingHealthLine(summary: HealthSummary): string | n return `Model pricing: warning (optional pricing refresh degraded)${detail}`; } +function buildContextEngineHealthSummary(): ContextEngineHealthSummary | undefined { + const quarantined: ContextEngineHealthSummary["quarantined"] = []; + for (const entry of listContextEngineQuarantines()) { + const summary: ContextEngineHealthSummary["quarantined"][number] = { + engineId: entry.engineId, + operation: entry.operation, + reason: entry.reason, + failedAt: entry.failedAt.getTime(), + }; + if (entry.owner) { + summary.owner = entry.owner; + } + quarantined.push(summary); + } + return quarantined.length > 0 ? { quarantined } : undefined; +} + +export function formatContextEngineHealthLine(summary: HealthSummary): string | null { + const quarantined = summary.contextEngines?.quarantined ?? []; + if (quarantined.length === 0) { + return null; + } + const engines = quarantined.map((entry) => entry.engineId).join(", "); + return `Context engine: warning (${quarantined.length} quarantined; downgraded to legacy: ${engines})`; +} + const resolveHeartbeatSummary = (cfg: OpenClawConfig, agentId: string) => resolveHeartbeatSummaryForAgent(cfg, agentId); @@ -571,12 +599,14 @@ export async function getHealthSnapshot(params?: { } const pluginHealth = buildPluginHealthSummary(); + const contextEngineHealth = buildContextEngineHealthSummary(); const summary: HealthSummary = { ok: true, ts: Date.now(), durationMs: Date.now() - start, ...(params?.eventLoop ? { eventLoop: params.eventLoop } : {}), ...(pluginHealth ? { plugins: pluginHealth } : {}), + ...(contextEngineHealth ? { contextEngines: contextEngineHealth } : {}), modelPricing: getGatewayModelPricingHealth({ enabled: isGatewayModelPricingEnabled(cfg) }), channels, channelOrder, @@ -782,6 +812,10 @@ export async function healthCommand( if (modelPricingLine) { runtime.log(styleHealthChannelLine(modelPricingLine, rich)); } + const contextEngineLine = formatContextEngineHealthLine(summary); + if (contextEngineLine) { + runtime.log(styleHealthChannelLine(contextEngineLine, rich)); + } for (const plugin of displayPlugins) { const channelSummary = summary.channels?.[plugin.id]; if (!channelSummary || channelSummary.linked !== true) { diff --git a/src/commands/health.types.ts b/src/commands/health.types.ts index 16fea14e318..e4f9389d191 100644 --- a/src/commands/health.types.ts +++ b/src/commands/health.types.ts @@ -35,6 +35,18 @@ export type PluginHealthSummary = { errors: PluginHealthErrorSummary[]; }; +export type ContextEngineHealthQuarantineSummary = { + engineId: string; + owner?: string; + operation: string; + reason: string; + failedAt: number; +}; + +export type ContextEngineHealthSummary = { + quarantined: ContextEngineHealthQuarantineSummary[]; +}; + export type ModelPricingHealthSummary = import("../gateway/model-pricing-cache-state.js").GatewayModelPricingHealth; @@ -44,6 +56,7 @@ export type HealthSummary = { durationMs: number; eventLoop?: import("../gateway/server/event-loop-health.js").GatewayEventLoopHealth; plugins?: PluginHealthSummary; + contextEngines?: ContextEngineHealthSummary; modelPricing?: ModelPricingHealthSummary; channels: Record; channelOrder: string[]; diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts index 6db7a25d540..c0099dc118f 100644 --- a/src/context-engine/context-engine.test.ts +++ b/src/context-engine/context-engine.test.ts @@ -13,7 +13,9 @@ import { registerLegacyContextEngine } from "./legacy.registration.js"; import { registerContextEngine, registerContextEngineForOwner, + clearContextEngineRuntimeQuarantine, getContextEngineFactory, + listContextEngineQuarantines, listContextEngineIds, resolveContextEngine, resolveContextEngineOwnerPluginId, @@ -584,6 +586,16 @@ describe("Registry tests", () => { // ═══════════════════════════════════════════════════════════════════════════ describe("Legacy sessionKey compatibility", () => { + beforeEach(() => { + registerLegacyContextEngine(); + clearContextEngineRuntimeQuarantine(); + vi.spyOn(console, "error").mockImplementation(() => {}); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + it("memoizes legacy mode after the first strict compatibility retry", async () => { const engineId = `legacy-sessionkey-${Date.now().toString(36)}`; const strictEngine = new LegacySessionKeyStrictEngine(engineId); @@ -655,24 +667,34 @@ describe("Legacy sessionKey compatibility", () => { expect(strictEngine.maintainCalls[1]).not.toHaveProperty("sessionKey"); }); - it("does not retry non-compat runtime errors", async () => { + it("quarantines and falls back for non-compat runtime errors", async () => { const engineId = `sessionkey-runtime-${Date.now().toString(36)}`; const runtimeErrorEngine = new SessionKeyRuntimeErrorEngine(engineId); registerContextEngine(engineId, () => runtimeErrorEngine); const engine = await resolveContextEngine(configWithSlot(engineId)); + const message = makeMockMessage(); - await expect( - engine.assemble({ - sessionId: "s1", - sessionKey: "agent:main:test", - messages: [makeMockMessage()], - }), - ).rejects.toThrow("sessionKey lookup failed"); + const result = await engine.assemble({ + sessionId: "s1", + sessionKey: "agent:main:test", + messages: [message], + }); + const nextEngine = await resolveContextEngine(configWithSlot(engineId)); + + expect(result.messages).toEqual([message]); + expect(nextEngine.info.id).toBe("legacy"); expect(runtimeErrorEngine.assembleCalls).toBe(1); + expect(listContextEngineQuarantines()).toEqual([ + expect.objectContaining({ + engineId, + operation: "assemble", + reason: "sessionKey lookup failed", + }), + ]); }); - it("does not treat 'Unknown sessionKey' runtime failures as schema-compat errors", async () => { + it("quarantines 'Unknown sessionKey' runtime failures instead of treating them as schema compat", async () => { const engineId = `sessionkey-unknown-runtime-${Date.now().toString(36)}`; const runtimeErrorEngine = new SessionKeyRuntimeErrorEngine( engineId, @@ -681,15 +703,23 @@ describe("Legacy sessionKey compatibility", () => { registerContextEngine(engineId, () => runtimeErrorEngine); const engine = await resolveContextEngine(configWithSlot(engineId)); + const message = makeMockMessage(); - await expect( - engine.assemble({ - sessionId: "s1", - sessionKey: "agent:main:missing", - messages: [makeMockMessage()], - }), - ).rejects.toThrow('Unknown sessionKey "agent:main:missing"'); + const result = await engine.assemble({ + sessionId: "s1", + sessionKey: "agent:main:missing", + messages: [message], + }); + + expect(result.messages).toEqual([message]); expect(runtimeErrorEngine.assembleCalls).toBe(1); + expect(listContextEngineQuarantines()).toEqual([ + expect.objectContaining({ + engineId, + operation: "assemble", + reason: 'Unknown sessionKey "agent:main:missing"', + }), + ]); }); }); @@ -841,6 +871,7 @@ describe("Factory context passing", () => { describe("Invalid engine fallback", () => { beforeEach(() => { registerLegacyContextEngine(); + clearContextEngineRuntimeQuarantine(); vi.spyOn(console, "error").mockImplementation(() => {}); }); @@ -855,7 +886,7 @@ describe("Invalid engine fallback", () => { engineId: uniqueEngineId("does-not-exist"), register: () => undefined, expectedError: (engineId: string) => - `[context-engine] Context engine "${engineId}" is not registered; falling back to default engine "legacy".`, + `[context-engine] Context engine "${engineId}" failed during resolve: not registered; quarantining it for this process and falling back to default engine "legacy".`, }, { name: "factory throws", @@ -866,7 +897,7 @@ describe("Invalid engine fallback", () => { }); }, expectedError: (engineId: string) => - `[context-engine] Context engine "${engineId}" factory threw during resolution: plugin version mismatch; falling back to default engine "legacy".`, + `[context-engine] Context engine "${engineId}" owner=public-sdk failed during factory: plugin version mismatch; quarantining it for this process and falling back to default engine "legacy".`, }, { name: "missing info metadata", @@ -889,7 +920,7 @@ describe("Invalid engine fallback", () => { ); }, expectedError: (engineId: string) => - `[context-engine] Context engine "${engineId}" factory returned an invalid ContextEngine: missing info.; falling back to default engine "legacy".`, + `[context-engine] Context engine "${engineId}" owner=public-sdk failed during contract-validation: Context engine "${engineId}" factory returned an invalid ContextEngine: missing info.; quarantining it for this process and falling back to default engine "legacy".`, }, { name: "missing lifecycle methods", @@ -907,7 +938,7 @@ describe("Invalid engine fallback", () => { ); }, expectedError: (engineId: string) => - `[context-engine] Context engine "${engineId}" factory returned an invalid ContextEngine: missing assemble(), missing compact().; falling back to default engine "legacy".`, + `[context-engine] Context engine "${engineId}" owner=public-sdk failed during contract-validation: Context engine "${engineId}" factory returned an invalid ContextEngine: missing assemble(), missing compact().; quarantining it for this process and falling back to default engine "legacy".`, }, { name: "contract validation throws", @@ -916,7 +947,7 @@ describe("Invalid engine fallback", () => { registerContextEngine(engineId, () => 42n as unknown as ContextEngine); }, expectedError: (engineId: string) => - `[context-engine] Context engine "${engineId}" contract validation threw: Do not know how to serialize a BigInt; falling back to default engine "legacy".`, + `[context-engine] Context engine "${engineId}" owner=public-sdk failed during contract-validation: Do not know how to serialize a BigInt; quarantining it for this process and falling back to default engine "legacy".`, }, ] as const; @@ -930,9 +961,163 @@ describe("Invalid engine fallback", () => { expect(console.error, testCase.name).toHaveBeenCalledWith( testCase.expectedError(testCase.engineId), ); + expect( + listContextEngineQuarantines().some((entry) => entry.engineId === testCase.engineId), + ).toBe(true); } }); + it("quarantines a selected engine after lifecycle failure and resolves legacy next time", async () => { + const engineId = uniqueEngineId("runtime-fail"); + const assemble = vi.fn(async () => { + throw new Error("lcm db is corrupt"); + }); + let factoryCalls = 0; + registerContextEngine(engineId, () => { + factoryCalls += 1; + return { + info: { id: "lcm", name: "Lossless Context Manager" }, + async ingest() { + return { ingested: true }; + }, + assemble, + async compact() { + return { ok: true, compacted: false }; + }, + }; + }); + + const engine = await resolveContextEngine(configWithSlot(engineId)); + const message = makeMockMessage("user", "hello"); + const result = await engine.assemble({ + sessionId: "s1", + messages: [message], + }); + const nextEngine = await resolveContextEngine(configWithSlot(engineId)); + + expect(result.messages).toEqual([message]); + expect(nextEngine.info.id).toBe("legacy"); + expect(factoryCalls).toBe(1); + expect(assemble).toHaveBeenCalledTimes(1); + expect(listContextEngineQuarantines()).toEqual([ + expect.objectContaining({ + engineId, + owner: "public-sdk", + operation: "assemble", + reason: "lcm db is corrupt", + }), + ]); + expect(console.error).toHaveBeenCalledWith( + `[context-engine] Context engine "${engineId}" owner=public-sdk failed during assemble: lcm db is corrupt; quarantining it for this process and falling back to default engine "legacy".`, + ); + }); + + it("clears a missing-engine quarantine when the plugin registers later", async () => { + const engineId = uniqueEngineId("late-register"); + const missingEngine = await resolveContextEngine(configWithSlot(engineId)); + + expect(missingEngine.info.id).toBe("legacy"); + expect(listContextEngineQuarantines()).toEqual([ + expect.objectContaining({ + engineId, + operation: "resolve", + reason: "not registered", + }), + ]); + + registerContextEngine(engineId, () => ({ + info: { id: engineId, name: "Late Registered Engine" }, + async ingest() { + return { ingested: true }; + }, + async assemble({ messages }: { messages: AgentMessage[] }) { + return { messages, estimatedTokens: 0 }; + }, + async compact() { + return { ok: true, compacted: false }; + }, + })); + + const registeredEngine = await resolveContextEngine(configWithSlot(engineId)); + + expect(listContextEngineQuarantines()).toEqual([]); + expect(registeredEngine.info.id).toBe(engineId); + }); + + it("does not quarantine abort rejections from lifecycle methods", async () => { + const engineId = uniqueEngineId("abort-rejection"); + const abortError = new Error("compaction aborted"); + abortError.name = "AbortError"; + const controller = new AbortController(); + registerContextEngine(engineId, () => ({ + info: { id: engineId, name: "Abort Aware Engine" }, + async ingest() { + return { ingested: true }; + }, + async assemble({ messages }: { messages: AgentMessage[] }) { + return { messages, estimatedTokens: 0 }; + }, + async compact() { + controller.abort(new Error("user stopped run")); + throw abortError; + }, + })); + + const engine = await resolveContextEngine(configWithSlot(engineId)); + + await expect( + engine.compact({ + sessionId: "s1", + sessionFile: "/tmp/session.json", + abortSignal: controller.signal, + }), + ).rejects.toThrow("compaction aborted"); + + const nextEngine = await resolveContextEngine(configWithSlot(engineId)); + expect(nextEngine.info.id).toBe(engineId); + expect(listContextEngineQuarantines()).toEqual([]); + expect(console.error).not.toHaveBeenCalled(); + }); + + it("quarantines subagent preparation failures while failing the active spawn closed", async () => { + const engineId = uniqueEngineId("prepare-subagent-fail"); + registerContextEngine(engineId, () => ({ + info: { id: engineId, name: "Spawn Aware Engine" }, + async ingest() { + return { ingested: true }; + }, + async assemble({ messages }: { messages: AgentMessage[] }) { + return { messages, estimatedTokens: 0 }; + }, + async compact() { + return { ok: true, compacted: false }; + }, + async prepareSubagentSpawn() { + throw new Error("child context projection failed"); + }, + })); + + const engine = await resolveContextEngine(configWithSlot(engineId)); + + await expect( + engine.prepareSubagentSpawn?.({ + parentSessionKey: "agent:main", + childSessionKey: "agent:child", + contextMode: "isolated", + }), + ).rejects.toThrow("child context projection failed"); + + const nextEngine = await resolveContextEngine(configWithSlot(engineId)); + expect(nextEngine.info.id).toBe("legacy"); + expect(listContextEngineQuarantines()).toEqual([ + expect.objectContaining({ + engineId, + operation: "prepareSubagentSpawn", + reason: "child context projection failed", + }), + ]); + }); + it("throws when the default engine itself is not registered", async () => { // Access the process-global registry via the well-known symbol and clear it // so even the default engine is missing. The symbol key must match the diff --git a/src/context-engine/registry.ts b/src/context-engine/registry.ts index cc4751d6e55..3e100f84670 100644 --- a/src/context-engine/registry.ts +++ b/src/context-engine/registry.ts @@ -2,7 +2,16 @@ import type { OpenClawConfig } from "../config/types.js"; import { defaultSlotIdForKey } from "../plugins/slots.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { sanitizeForLog } from "../terminal/ansi.js"; -import type { ContextEngine } from "./types.js"; +import type { + AssembleResult, + BootstrapResult, + CompactResult, + ContextEngine, + ContextEngineMaintenanceResult, + IngestBatchResult, + IngestResult, + SubagentSpawnPreparation, +} from "./types.js"; /** * Runtime context passed to context engine factories during resolution. @@ -320,9 +329,26 @@ function wrapContextEngineWithSessionKeyCompat(engine: ContextEngine): ContextEn function wrapResolvedContextEngine( engine: ContextEngine, - metadata: { owner: string }, + metadata: { + owner: string; + engineId: string; + defaultEngineId?: string; + factoryCtx?: ContextEngineFactoryContext; + }, ): ContextEngine { - const wrapped = wrapContextEngineWithSessionKeyCompat(engine); + const compatWrapped = wrapContextEngineWithSessionKeyCompat(engine); + const wrapped = + metadata.defaultEngineId && + metadata.factoryCtx && + metadata.engineId !== metadata.defaultEngineId + ? wrapContextEngineWithRuntimeQuarantine({ + engine: compatWrapped, + engineId: metadata.engineId, + owner: metadata.owner, + defaultEngineId: metadata.defaultEngineId, + factoryCtx: metadata.factoryCtx, + }) + : compatWrapped; RESOLVED_CONTEXT_ENGINE_METADATA.set(wrapped, metadata); return wrapped; } @@ -335,6 +361,14 @@ const CONTEXT_ENGINE_REGISTRY_STATE = Symbol.for("openclaw.contextEngineRegistry const CORE_CONTEXT_ENGINE_OWNER = "core"; const PUBLIC_CONTEXT_ENGINE_OWNER = "public-sdk"; +export type ContextEngineRuntimeQuarantine = { + engineId: string; + owner?: string; + operation: string; + reason: string; + failedAt: Date; +}; + type ContextEngineRegistryState = { engines: Map< string, @@ -343,6 +377,7 @@ type ContextEngineRegistryState = { owner: string; } >; + quarantinedEngines: Map; }; // Keep context-engine registrations process-global so duplicated dist chunks @@ -351,6 +386,7 @@ const contextEngineRegistryState = resolveGlobalSingleton ({ engines: new Map(), + quarantinedEngines: new Map(), }), ); @@ -368,6 +404,69 @@ function requireContextEngineOwner(owner: string): string { return normalizedOwner; } +function formatContextEngineError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function recordContextEngineQuarantine(params: { + engineId: string; + owner?: string; + operation: string; + error: unknown; + defaultEngineId: string; +}): ContextEngineRuntimeQuarantine { + const registryState = getContextEngineRegistryState(); + const existing = registryState.quarantinedEngines.get(params.engineId); + if (existing) { + return existing; + } + + const quarantine: ContextEngineRuntimeQuarantine = { + engineId: params.engineId, + operation: params.operation, + reason: formatContextEngineError(params.error), + failedAt: new Date(), + ...(params.owner ? { owner: params.owner } : {}), + }; + registryState.quarantinedEngines.set(params.engineId, quarantine); + const ownerSuffix = params.owner ? ` owner=${sanitizeForLog(params.owner)}` : ""; + console.error( + `[context-engine] Context engine "${sanitizeForLog(params.engineId)}"${ownerSuffix} failed during ${sanitizeForLog(params.operation)}: ` + + `${sanitizeForLog(quarantine.reason)}; quarantining it for this process and falling back to default engine "${params.defaultEngineId}".`, + ); + return quarantine; +} + +function getContextEngineQuarantine(engineId: string): ContextEngineRuntimeQuarantine | undefined { + return getContextEngineRegistryState().quarantinedEngines.get(engineId); +} + +export function listContextEngineQuarantines(): ContextEngineRuntimeQuarantine[] { + const quarantines: ContextEngineRuntimeQuarantine[] = []; + for (const entry of getContextEngineRegistryState().quarantinedEngines.values()) { + const quarantine: ContextEngineRuntimeQuarantine = { + engineId: entry.engineId, + operation: entry.operation, + reason: entry.reason, + failedAt: new Date(entry.failedAt), + }; + if (entry.owner) { + quarantine.owner = entry.owner; + } + quarantines.push(quarantine); + } + return quarantines; +} + +export function clearContextEngineRuntimeQuarantine(engineId?: string): void { + const quarantinedEngines = getContextEngineRegistryState().quarantinedEngines; + if (engineId === undefined) { + quarantinedEngines.clear(); + return; + } + quarantinedEngines.delete(engineId); +} + /** * Register a context engine implementation under an explicit trusted owner. */ @@ -393,6 +492,7 @@ export function registerContextEngineForOwner( return { ok: false, existingOwner: existing.owner }; } registry.set(id, { factory, owner: normalizedOwner }); + clearContextEngineRuntimeQuarantine(id); return { ok: true }; } @@ -426,10 +526,12 @@ export function listContextEngineIds(): string[] { export function clearContextEnginesForOwner(owner: string): void { const normalizedOwner = requireContextEngineOwner(owner); - const registry = getContextEngineRegistryState().engines; + const registryState = getContextEngineRegistryState(); + const registry = registryState.engines; for (const [id, entry] of registry.entries()) { if (entry.owner === normalizedOwner) { registry.delete(id); + registryState.quarantinedEngines.delete(id); } } } @@ -496,6 +598,206 @@ function describeResolvedContextEngineContractError( return `Context engine "${engineId}" factory returned an invalid ContextEngine: ${issues.join(", ")}.`; } +type GuardedContextEngineMethodName = + | "bootstrap" + | "maintain" + | "ingest" + | "ingestBatch" + | "afterTurn" + | "assemble" + | "compact" + | "prepareSubagentSpawn" + | "onSubagentEnded"; + +const GUARDED_CONTEXT_ENGINE_METHODS = new Set([ + "bootstrap", + "maintain", + "ingest", + "ingestBatch", + "afterTurn", + "assemble", + "compact", + "prepareSubagentSpawn", + "onSubagentEnded", +] satisfies GuardedContextEngineMethodName[]); + +function contextEngineFallbackResult( + methodName: GuardedContextEngineMethodName, +): BootstrapResult | ContextEngineMaintenanceResult | IngestResult | IngestBatchResult | void { + switch (methodName) { + case "bootstrap": + return { + bootstrapped: false, + reason: "context engine downgraded to legacy", + }; + case "maintain": + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "context engine downgraded to legacy", + }; + case "ingest": + return { ingested: false }; + case "ingestBatch": + return { ingestedCount: 0 }; + case "afterTurn": + case "prepareSubagentSpawn": + case "onSubagentEnded": + return undefined; + case "assemble": + case "compact": + throw new Error(`No legacy fallback result for ${methodName}`); + } +} + +function contextEngineAbortSignal(methodParams: unknown): AbortSignal | undefined { + if (!methodParams || typeof methodParams !== "object") { + return undefined; + } + const signal = (methodParams as { abortSignal?: unknown }).abortSignal; + if (signal && typeof signal === "object" && "aborted" in signal) { + return signal as AbortSignal; + } + return undefined; +} + +function contextEngineAbortError(methodParams: unknown): Error | undefined { + const signal = contextEngineAbortSignal(methodParams); + if (!signal?.aborted) { + return undefined; + } + const reason = signal.reason; + if (reason instanceof Error) { + return reason; + } + const error = new Error( + typeof reason === "string" && reason ? reason : "Context engine operation aborted.", + ); + error.name = "AbortError"; + return error; +} + +function isContextEngineAbortRejection(error: unknown, methodParams: unknown): boolean { + const signal = contextEngineAbortSignal(methodParams); + if (!signal?.aborted) { + return false; + } + if (error === signal.reason) { + return true; + } + if (error instanceof Error) { + const message = error.message.toLowerCase(); + return ( + error.name === "AbortError" || + message.includes("abort") || + message.includes("cancelled") || + message.includes("canceled") + ); + } + return typeof error === "string" && /abort|cancelled|canceled/iu.test(error); +} + +async function invokeFallbackContextEngineMethod(params: { + getFallbackEngine: () => Promise; + methodName: GuardedContextEngineMethodName; + methodParams: unknown; +}): Promise< + | AssembleResult + | BootstrapResult + | CompactResult + | ContextEngineMaintenanceResult + | IngestBatchResult + | IngestResult + | SubagentSpawnPreparation + | void +> { + const fallbackEngine = await params.getFallbackEngine(); + const fallbackMethod = fallbackEngine[params.methodName] as + | ((methodParams: unknown) => unknown) + | undefined; + if (typeof fallbackMethod === "function") { + return (await fallbackMethod.call(fallbackEngine, params.methodParams)) as + | AssembleResult + | BootstrapResult + | CompactResult + | ContextEngineMaintenanceResult + | IngestBatchResult + | IngestResult + | SubagentSpawnPreparation + | void; + } + return contextEngineFallbackResult(params.methodName); +} + +function wrapContextEngineWithRuntimeQuarantine(params: { + engine: ContextEngine; + engineId: string; + owner: string; + defaultEngineId: string; + factoryCtx: ContextEngineFactoryContext; +}): ContextEngine { + let fallbackEnginePromise: Promise | undefined; + const getFallbackEngine = () => { + fallbackEnginePromise ??= resolveDefaultContextEngine( + params.defaultEngineId, + params.factoryCtx, + ); + return fallbackEnginePromise; + }; + + return new Proxy(params.engine, { + get(target, property, receiver) { + const value = Reflect.get(target, property, receiver); + if (typeof value !== "function" || !GUARDED_CONTEXT_ENGINE_METHODS.has(property)) { + return typeof value === "function" ? value.bind(target) : value; + } + + const methodName = property as GuardedContextEngineMethodName; + return async (methodParams: unknown) => { + const aborted = contextEngineAbortError(methodParams); + if (aborted) { + throw aborted; + } + if (getContextEngineQuarantine(params.engineId)) { + return await invokeFallbackContextEngineMethod({ + getFallbackEngine, + methodName, + methodParams, + }); + } + + try { + return await (value as (methodParams: unknown) => unknown).call(target, methodParams); + } catch (error) { + if (isContextEngineAbortRejection(error, methodParams)) { + throw error; + } + recordContextEngineQuarantine({ + engineId: params.engineId, + owner: params.owner, + operation: methodName, + error, + defaultEngineId: params.defaultEngineId, + }); + if (methodName === "prepareSubagentSpawn") { + throw error; + } + try { + return await invokeFallbackContextEngineMethod({ + getFallbackEngine, + methodName, + methodParams, + }); + } catch { + throw error; + } + } + }; + }, + }); +} + // --------------------------------------------------------------------------- // Resolution // --------------------------------------------------------------------------- @@ -543,6 +845,11 @@ export async function resolveContextEngine( workspaceDir: options?.workspaceDir, }; + const quarantine = !isDefaultEngine ? getContextEngineQuarantine(engineId) : undefined; + if (quarantine) { + return resolveDefaultContextEngine(defaultEngineId, factoryCtx); + } + const entry = getContextEngineRegistryState().engines.get(engineId); if (!entry) { if (isDefaultEngine) { @@ -551,10 +858,12 @@ export async function resolveContextEngine( `Available engines: ${listContextEngineIds().join(", ") || "(none)"}`, ); } - console.error( - `[context-engine] Context engine "${sanitizeForLog(engineId)}" is not registered; ` + - `falling back to default engine "${defaultEngineId}".`, - ); + recordContextEngineQuarantine({ + engineId, + operation: "resolve", + error: "not registered", + defaultEngineId, + }); return resolveDefaultContextEngine(defaultEngineId, factoryCtx); } @@ -565,11 +874,13 @@ export async function resolveContextEngine( if (isDefaultEngine) { throw factoryError; } - console.error( - `[context-engine] Context engine "${sanitizeForLog(engineId)}" factory threw during resolution: ` + - `${sanitizeForLog(factoryError instanceof Error ? factoryError.message : String(factoryError))}; ` + - `falling back to default engine "${defaultEngineId}".`, - ); + recordContextEngineQuarantine({ + engineId, + owner: entry.owner, + operation: "factory", + error: factoryError, + defaultEngineId, + }); return resolveDefaultContextEngine(defaultEngineId, factoryCtx); } @@ -580,25 +891,35 @@ export async function resolveContextEngine( if (isDefaultEngine) { throw validationError; } - console.error( - `[context-engine] Context engine "${sanitizeForLog(engineId)}" contract validation threw: ` + - `${sanitizeForLog(validationError instanceof Error ? validationError.message : String(validationError))}; ` + - `falling back to default engine "${defaultEngineId}".`, - ); + recordContextEngineQuarantine({ + engineId, + owner: entry.owner, + operation: "contract-validation", + error: validationError, + defaultEngineId, + }); return resolveDefaultContextEngine(defaultEngineId, factoryCtx); } if (contractError) { if (isDefaultEngine) { throw new Error(contractError); } - // contractError includes engineId from plugin config; sanitizeForLog covers it - console.error( - `[context-engine] ${sanitizeForLog(contractError)}; falling back to default engine "${defaultEngineId}".`, - ); + recordContextEngineQuarantine({ + engineId, + owner: entry.owner, + operation: "contract-validation", + error: contractError, + defaultEngineId, + }); return resolveDefaultContextEngine(defaultEngineId, factoryCtx); } - return wrapResolvedContextEngine(engine, { owner: entry.owner }); + return wrapResolvedContextEngine(engine, { + owner: entry.owner, + engineId, + defaultEngineId, + factoryCtx, + }); } /** @@ -623,5 +944,8 @@ async function resolveDefaultContextEngine( if (contractError) { throw new Error(`[context-engine] ${contractError}`); } - return wrapResolvedContextEngine(engine, { owner: defaultEntry.owner }); + return wrapResolvedContextEngine(engine, { + owner: defaultEntry.owner, + engineId: defaultEngineId, + }); } diff --git a/src/gateway/server-methods/health.ts b/src/gateway/server-methods/health.ts index 8064a7c61c4..328548c0a69 100644 --- a/src/gateway/server-methods/health.ts +++ b/src/gateway/server-methods/health.ts @@ -2,6 +2,7 @@ import { ErrorCodes, errorShape } from "../../../packages/gateway-protocol/src/i import type { ChannelAccountSnapshot } from "../../channels/plugins/types.public.js"; import type { ChannelHealthSummary, HealthSummary } from "../../commands/health.types.js"; import { getStatusSummary } from "../../commands/status.js"; +import { listContextEngineQuarantines } from "../../context-engine/registry.js"; import { getGatewayModelPricingHealth } from "../model-pricing-cache-state.js"; import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js"; import { HEALTH_REFRESH_INTERVAL_MS } from "../server-constants.js"; @@ -87,9 +88,26 @@ function mergeCachedHealthRuntimeState(params: { cached: HealthSummary; eventLoop?: HealthSummary["eventLoop"]; }): HealthSummary { + const { contextEngines: _cachedContextEngines, ...cached } = params.cached; + const quarantinedContextEngines: NonNullable["quarantined"] = []; + for (const entry of listContextEngineQuarantines()) { + const summary: NonNullable["quarantined"][number] = { + engineId: entry.engineId, + operation: entry.operation, + reason: entry.reason, + failedAt: entry.failedAt.getTime(), + }; + if (entry.owner) { + summary.owner = entry.owner; + } + quarantinedContextEngines.push(summary); + } return { - ...params.cached, + ...cached, ...(params.eventLoop ? { eventLoop: params.eventLoop } : {}), + ...(quarantinedContextEngines.length > 0 + ? { contextEngines: { quarantined: quarantinedContextEngines } } + : {}), modelPricing: getGatewayModelPricingHealth({ enabled: params.cached.modelPricing?.state !== "disabled", }), diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 92f10136caa..7606e25ddbc 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -7,6 +7,13 @@ import { fileURLToPath } from "node:url"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { validateExecApprovalRequestParams } from "../../../packages/gateway-protocol/src/index.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { registerLegacyContextEngine } from "../../context-engine/legacy.registration.js"; +import { + clearContextEngineRuntimeQuarantine, + clearContextEnginesForOwner, + registerContextEngineForOwner, + resolveContextEngine, +} from "../../context-engine/registry.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; import { formatZonedTimestamp } from "../../infra/format-time/format-datetime.js"; import { @@ -3037,6 +3044,7 @@ describe("gateway healthHandlers.status scope handling", () => { describe("gateway healthHandlers.health cache freshness", () => { let healthHandlers: typeof import("./health.js").healthHandlers; let pricingState: typeof import("../model-pricing-cache-state.js"); + const contextEngineTestOwner = "plugin:health-test"; beforeAll(async () => { ({ healthHandlers } = await import("./health.js")); @@ -3045,10 +3053,15 @@ describe("gateway healthHandlers.health cache freshness", () => { beforeEach(() => { pricingState.clearGatewayModelPricingCacheState(); + registerLegacyContextEngine(); + clearContextEnginesForOwner(contextEngineTestOwner); + clearContextEngineRuntimeQuarantine(); }); afterEach(() => { pricingState.clearGatewayModelPricingCacheState(); + clearContextEnginesForOwner(contextEngineTestOwner); + clearContextEngineRuntimeQuarantine(); }); it("refreshes cached health when runtime channel lifecycle has changed", async () => { @@ -3252,6 +3265,83 @@ describe("gateway healthHandlers.health cache freshness", () => { }); }); + it("merges live context-engine quarantine state into cached health responses", async () => { + const engineId = `health-context-engine-${Date.now()}`; + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + registerContextEngineForOwner( + engineId, + () => ({ + info: { id: "lcm", name: "Lossless Claw Memory" }, + ingest: async () => ({ ingested: false }), + assemble: async () => { + throw new Error("lcm transcript store is corrupt"); + }, + compact: async () => ({ ok: true, compacted: false }), + }), + contextEngineTestOwner, + ); + try { + const contextEngine = await resolveContextEngine({ + plugins: { slots: { contextEngine: engineId } }, + } as OpenClawConfig); + await contextEngine.assemble({ sessionId: "s1", messages: [] }); + + const cached = { + ok: true, + ts: Date.now(), + durationMs: 1, + channels: {}, + channelOrder: [], + channelLabels: {}, + heartbeatSeconds: 0, + defaultAgentId: "main", + agents: [], + sessions: { path: "/tmp/sessions.json", count: 0, recent: [] }, + }; + const respond = vi.fn(); + const refreshHealthSnapshot = vi.fn().mockResolvedValue(cached); + + await healthHandlers.health({ + req: {} as never, + params: {} as never, + respond: respond as never, + context: { + getHealthCache: () => cached, + refreshHealthSnapshot, + getRuntimeSnapshot: () => ({ channels: {}, channelAccounts: {} }), + logHealth: { error: vi.fn() }, + } as never, + client: { connect: { role: "operator", scopes: ["operator.read"] } } as never, + isWebchatConnect: () => false, + }); + + const payload = mockCallArg(respond, 0, 1) as + | { + contextEngines?: { + quarantined?: Array<{ + engineId?: string; + owner?: string; + operation?: string; + reason?: string; + failedAt?: number; + }>; + }; + } + | undefined; + expect(payload?.contextEngines?.quarantined).toHaveLength(1); + expect(payload?.contextEngines?.quarantined?.[0]).toMatchObject({ + engineId, + owner: contextEngineTestOwner, + operation: "assemble", + reason: "lcm transcript store is corrupt", + }); + expect(typeof payload?.contextEngines?.quarantined?.[0]?.failedAt).toBe("number"); + expect(mockCallArg(respond, 0, 3)).toEqual({ cached: true }); + } finally { + consoleError.mockRestore(); + } + }); + it("refreshes cached health when a runtime account is missing from the cached account summary", async () => { const cached = { ok: true, diff --git a/test/scripts/kitchen-sink-plugin-assertions.test.ts b/test/scripts/kitchen-sink-plugin-assertions.test.ts index d278cbe2b57..6fd42920c89 100644 --- a/test/scripts/kitchen-sink-plugin-assertions.test.ts +++ b/test/scripts/kitchen-sink-plugin-assertions.test.ts @@ -27,6 +27,7 @@ function fullSurfaceInspectPayload(pluginId: string) { id: pluginId, enabled: true, status: "loaded", + contextEngineIds: [pluginId], channelIds: ["kitchen-sink-channel"], providerIds: ["kitchen-sink-provider"], speechProviderIds: ["kitchen-sink-speech"], @@ -115,6 +116,72 @@ function runAssertInstalled({ } } +function runAssertClawhubInstalled({ + contextEngineIds = [], +}: { + contextEngineIds?: string[]; +} = {}) { + const label = `clawhub-context-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`; + const pluginId = "openclaw-kitchen-sink-fixture"; + const home = mkdtempSync(path.join(tmpdir(), "openclaw-kitchen-sink-home-")); + const installPath = mkdtempSync(path.join(tmpdir(), "openclaw-kitchen-sink-install-")); + const scratchRoot = tmpdir(); + const pluginsJsonPath = path.join(scratchRoot, `kitchen-sink-${label}-plugins.json`); + const inspectJsonPath = path.join(scratchRoot, `kitchen-sink-${label}-inspect.json`); + const inspectAllJsonPath = path.join(scratchRoot, `kitchen-sink-${label}-inspect-all.json`); + const installPathMarker = path.join(scratchRoot, `kitchen-sink-${label}-install-path.txt`); + const installsPath = path.join(home, ".openclaw", "plugins", "installs.json"); + try { + const inspectPayload = fullSurfaceInspectPayload(pluginId); + inspectPayload.plugin.contextEngineIds = contextEngineIds; + writeJson(pluginsJsonPath, { + diagnostics: [], + plugins: [{ id: pluginId, status: "loaded" }], + }); + writeJson(inspectJsonPath, inspectPayload); + writeJson(inspectAllJsonPath, { diagnostics: [] }); + writeJson(installsPath, { + installRecords: { + [pluginId]: { + artifactFormat: "zip", + artifactKind: "legacy-zip", + clawhubFamily: "code-plugin", + clawhubPackage: "@openclaw/kitchen-sink", + integrity: "sha256-test", + installPath, + resolvedSpec: "clawhub:@openclaw/kitchen-sink@latest", + resolvedVersion: "1.0.0", + resolvedAt: 1, + source: "clawhub", + spec: "clawhub:@openclaw/kitchen-sink@latest", + version: "1.0.0", + }, + }, + }); + + return spawnSync(process.execPath, [ASSERTIONS_SCRIPT, "assert-installed"], { + encoding: "utf8", + env: { + ...process.env, + HOME: home, + KITCHEN_SINK_ID: pluginId, + KITCHEN_SINK_LABEL: label, + KITCHEN_SINK_SOURCE: "clawhub", + KITCHEN_SINK_SPEC: "clawhub:@openclaw/kitchen-sink@latest", + KITCHEN_SINK_SURFACE_MODE: "basic", + KITCHEN_SINK_TMP_DIR: scratchRoot, + }, + }); + } finally { + rmSync(home, { force: true, recursive: true }); + rmSync(installPath, { force: true, recursive: true }); + rmSync(pluginsJsonPath, { force: true }); + rmSync(inspectJsonPath, { force: true }); + rmSync(inspectAllJsonPath, { force: true }); + rmSync(installPathMarker, { force: true }); + } +} + function runScanLogs({ home, scratchRoot }: { home: string; scratchRoot: string }) { return spawnSync(process.execPath, [ASSERTIONS_SCRIPT, "scan-logs"], { encoding: "utf8", @@ -144,6 +211,21 @@ describe("kitchen-sink plugin assertions", () => { expect(result.status).toBe(0); }); + it("requires ClawHub kitchen-sink fixtures to expose context engines", () => { + const result = runAssertClawhubInstalled({ contextEngineIds: [] }); + + expect(result.status).not.toBe(0); + expect(`${result.stdout}\n${result.stderr}`).toContain("context engines missing"); + }); + + it("accepts ClawHub kitchen-sink fixtures with a context engine", () => { + const result = runAssertClawhubInstalled({ + contextEngineIds: ["openclaw-kitchen-sink-fixture"], + }); + + expect(result.status).toBe(0); + }); + it("keeps exhaustive diagnostic matching available for synchronized fixtures", () => { const result = runAssertInstalled({ diagnostics: diagnosticErrors(REQUIRED_FULL_DIAGNOSTIC_CANARIES),