From 1d61862adb0d89c3c619108a79e0aa952f49b9e5 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 28 Apr 2026 23:41:22 -0700 Subject: [PATCH] fix(gateway): yield after agent accepted ack --- .../lib/kitchen-sink-plugin/assertions.mjs | 11 ++- src/gateway/server-methods/agent.test.ts | 36 ++++++++ src/gateway/server-methods/agent.ts | 17 ++++ src/plugins/loader.test.ts | 85 ++++++++++++++++++ src/plugins/memory-state.test.ts | 8 ++ src/plugins/memory-state.ts | 16 +++- src/plugins/registry.ts | 89 ++++++++++++++++--- 7 files changed, 246 insertions(+), 16 deletions(-) diff --git a/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs b/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs index 7cdfea2cb7b..c00e45362fe 100644 --- a/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs +++ b/scripts/e2e/lib/kitchen-sink-plugin/assertions.mjs @@ -118,6 +118,11 @@ const expectIncludes = (listValue, expected, field) => { throw new Error(`${field} missing ${expected}: ${JSON.stringify(listValue)}`); } }; +const expectMissing = (listValue, expected, field) => { + if (Array.isArray(listValue) && listValue.includes(expected)) { + throw new Error(`${field} unexpectedly included ${expected}: ${JSON.stringify(listValue)}`); + } +}; function assertRealPathInside(parentPath, childPath, label) { const parentRealPath = fs.realpathSync(parentPath); @@ -221,11 +226,11 @@ function assertInstalled() { webFetchProviderIds: ["kitchen-sink-web-fetch-provider", "web fetch providers"], webSearchProviderIds: ["kitchen-sink-web-search-provider", "web search providers"], migrationProviderIds: ["kitchen-sink-migration-provider", "migration providers"], - agentHarnessIds: ["kitchen-sink-agent-harness", "agent harnesses"], }; for (const [field, [id, label]] of Object.entries(pluginSurfaceIds)) { expectIncludes(inspect.plugin?.[field], id, label); } + expectMissing(inspect.plugin?.agentHarnessIds, "kitchen-sink-agent-harness", "agent harnesses"); expectIncludes(inspect.services, "kitchen-sink-service", "services"); expectIncludes(inspect.commands, "kitchen-sink-command", "commands"); expectIncludes(toolNames, "kitchen-sink-tool", "tools"); @@ -241,11 +246,15 @@ function assertInstalled() { const expectedErrorMessages = new Set([ "only bundled plugins can register agent tool result middleware", + 'agent harness "kitchen-sink-agent-harness" registration missing required runtime methods', 'channel "kitchen-sink-channel-probe" registration missing required config helpers', "cli registration missing explicit commands metadata", "only bundled plugins can register Codex app-server extension factories", + 'compaction provider "kitchen-sink-compaction-provider" registration missing summarize', + "context engine registration missing id", "http route registration missing or invalid auth: /kitchen-sink/http-route", "plugin must own memory slot or declare contracts.memoryEmbeddingProviders for adapter: kitchen-sink-memory-embedding-provider", + "memory prompt supplement registration missing builder", ]); for (const message of errorMessages) { if (!expectedErrorMessages.has(message)) { diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 3e64ad6e9aa..2b128e1dc6b 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -2491,6 +2491,42 @@ describe("gateway agent handler chat.abort integration", () => { expect((entry?.expiresAtMs ?? 0) - (entry?.startedAtMs ?? 0)).toBeGreaterThan(24 * 60 * 60_000); }); + it("yields after the accepted ack before dispatching heavy agent work", async () => { + prime(); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const respond = vi.fn(); + const runId = "idem-yield-before-dispatch"; + const pending = invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { respond, reqId: runId }, + ); + + await Promise.resolve(); + await Promise.resolve(); + + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + runId, + status: "accepted", + }), + undefined, + { runId }, + ); + expect(mocks.agentCommand).not.toHaveBeenCalled(); + + await new Promise((resolve) => setImmediate(resolve)); + await pending; + + expect(mocks.agentCommand).toHaveBeenCalledTimes(1); + }); + it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => { prime(); mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 83f9671dca0..3c55002d9b5 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -1,4 +1,5 @@ import { randomUUID } from "node:crypto"; +import { MessageChannel } from "node:worker_threads"; import { listAgentIds, resolveDefaultAgentId, @@ -392,6 +393,18 @@ function dispatchAgentRunFromGateway(params: { }); } +function yieldAfterAgentAcceptedAck(): Promise { + return new Promise((resolve) => { + const channel = new MessageChannel(); + channel.port1.on("message", () => { + channel.port1.close(); + channel.port2.close(); + resolve(); + }); + channel.port2.postMessage(undefined); + }); +} + export const agentHandlers: GatewayRequestHandlers = { agent: async ({ params, respond, context, client, isWebchatConnect }) => { const p = params; @@ -1116,6 +1129,10 @@ export const agentHandlers: GatewayRequestHandlers = { }, }); respond(true, accepted, undefined, { runId }); + // Give the accepted frame one event-loop turn to flush before the runner + // starts potentially heavy synchronous prompt/context setup. Otherwise a + // hot pre-turn path can starve the WebSocket caller until it times out. + await yieldAfterAgentAcceptedAck(); let dispatched = false; try { diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index d82263b9456..c1dd8d56093 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -25,6 +25,7 @@ import { withEnv } from "../test-utils/env.js"; import { resolveBundledRuntimeDependencyInstallRootPlan } from "./bundled-runtime-deps.js"; import { clearPluginCommands } from "./command-registry-state.js"; import { getPluginCommandSpecs } from "./command-specs.js"; +import { listCompactionProviderIds } from "./compaction-provider.js"; import { getGlobalHookRunner, getGlobalPluginRegistry, @@ -72,6 +73,7 @@ import { getMemoryRuntime, listActiveMemoryPublicArtifacts, listMemoryCorpusSupplements, + listMemoryPromptSupplements, registerMemoryCorpusSupplement, registerMemoryFlushPlanResolver, registerMemoryPromptSupplement, @@ -3386,6 +3388,44 @@ module.exports = { id: "throws-after-import", register() {} };`, expect(listAgentHarnessIds()).toEqual([]); }); + it("rejects malformed plugin agent harness registrations", () => { + useNoBundledPlugins(); + const plugin = writePlugin({ + id: "bad-harness", + filename: "bad-harness.cjs", + body: `module.exports = { + id: "bad-harness", + register(api) { + api.registerAgentHarness({ + id: "broken", + label: "Broken", + }); + }, + };`, + }); + + const registry = loadOpenClawPlugins({ + cache: false, + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["bad-harness"], + }, + }, + onlyPluginIds: ["bad-harness"], + }); + + expect(listAgentHarnessIds()).toEqual([]); + expect(registry.diagnostics).toContainEqual( + expect.objectContaining({ + level: "error", + pluginId: "bad-harness", + message: 'agent harness "broken" registration missing required runtime methods', + }), + ); + }); + it("does not register internal hooks globally during non-activating loads", () => { useNoBundledPlugins(); const plugin = writePlugin({ @@ -5120,6 +5160,21 @@ module.exports = { id: "throws-after-import", register() {} };`, ).toBe("Demo Duplicate"); }, }, + { + label: "rejects malformed plugin context engine registration", + pluginId: "context-engine-malformed", + body: `module.exports = { id: "context-engine-malformed", register(api) { + api.registerContextEngine({ id: "broken-context" }); +} };`, + assert: (registry: ReturnType) => { + expectRegistryErrorDiagnostic({ + registry, + pluginId: "context-engine-malformed", + message: "context engine registration missing id", + }); + expect(listContextEngineIds()).not.toContain("broken-context"); + }, + }, { label: "rejects plugin context engine ids reserved by core", pluginId: "context-engine-core-collision", @@ -5134,6 +5189,36 @@ module.exports = { id: "throws-after-import", register() {} };`, }); }, }, + { + label: "rejects malformed compaction provider registration", + pluginId: "compaction-provider-malformed", + body: `module.exports = { id: "compaction-provider-malformed", register(api) { + api.registerCompactionProvider({ id: "broken-compaction", label: "Broken" }); +} };`, + assert: (registry: ReturnType) => { + expectRegistryErrorDiagnostic({ + registry, + pluginId: "compaction-provider-malformed", + message: 'compaction provider "broken-compaction" registration missing summarize', + }); + expect(listCompactionProviderIds()).not.toContain("broken-compaction"); + }, + }, + { + label: "rejects malformed memory prompt supplement registration", + pluginId: "memory-prompt-supplement-malformed", + body: `module.exports = { id: "memory-prompt-supplement-malformed", register(api) { + api.registerMemoryPromptSupplement({ id: "broken-memory-prompt" }); +} };`, + assert: (registry: ReturnType) => { + expectRegistryErrorDiagnostic({ + registry, + pluginId: "memory-prompt-supplement-malformed", + message: "memory prompt supplement registration missing builder", + }); + expect(listMemoryPromptSupplements()).toEqual([]); + }, + }, { label: "requires plugin CLI registrars to declare explicit command roots", pluginId: "cli-missing-metadata", diff --git a/src/plugins/memory-state.test.ts b/src/plugins/memory-state.test.ts index c1c37353d20..8aebbba2baf 100644 --- a/src/plugins/memory-state.test.ts +++ b/src/plugins/memory-state.test.ts @@ -206,6 +206,14 @@ describe("memory plugin state", () => { ]); }); + it("ignores malformed prompt builder output", () => { + registerMemoryPromptSection(() => ["primary", 1, undefined] as never); + registerMemoryPromptSupplement("async-helper", () => Promise.resolve(["async"]) as never); + registerMemoryPromptSupplement("valid-helper", () => ["valid", false] as never); + + expect(buildMemoryPromptSection({ availableTools: new Set() })).toEqual(["primary", "valid"]); + }); + it("stores memory corpus supplements", async () => { const supplement = { search: async () => [{ corpus: "wiki", path: "sources/alpha.md", score: 1, snippet: "x" }], diff --git a/src/plugins/memory-state.ts b/src/plugins/memory-state.ts index 216a13b07bd..474019c7f8e 100644 --- a/src/plugins/memory-state.ts +++ b/src/plugins/memory-state.ts @@ -208,17 +208,25 @@ export function buildMemoryPromptSection(params: { availableTools: Set; citationsMode?: MemoryCitationsMode; }): string[] { - const primary = + const primary = normalizeMemoryPromptLines( memoryPluginState.capability?.capability.promptBuilder?.(params) ?? - memoryPluginState.promptBuilder?.(params) ?? - []; + memoryPluginState.promptBuilder?.(params) ?? + [], + ); const supplements = memoryPluginState.promptSupplements // Keep supplement order stable even if plugin registration order changes. .toSorted((left, right) => left.pluginId.localeCompare(right.pluginId)) - .flatMap((registration) => registration.builder(params)); + .flatMap((registration) => normalizeMemoryPromptLines(registration.builder(params))); return [...primary, ...supplements]; } +function normalizeMemoryPromptLines(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + return value.filter((line): line is string => typeof line === "string"); +} + export function getMemoryPromptSectionBuilder(): MemoryPromptSectionBuilder | undefined { return memoryPluginState.capability?.capability.promptBuilder ?? memoryPluginState.promptBuilder; } diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index 2ea3890410a..f42014fbcc4 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -805,7 +805,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { }; const registerAgentHarness = (record: PluginRecord, harness: AgentHarness) => { - const id = harness.id.trim(); + const id = normalizeOptionalString((harness as Partial | undefined)?.id) ?? ""; if (!id) { pushDiagnostic({ level: "error", @@ -815,6 +815,15 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { }); return; } + if (typeof harness.supports !== "function" || typeof harness.runAttempt !== "function") { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: `agent harness "${id}" registration missing required runtime methods`, + }); + return; + } const existing = registryParams.activateGlobalSideEffects === false ? registry.agentHarnesses.find((entry) => entry.harness.id === id) @@ -2055,35 +2064,84 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerConversationBindingResolvedHandler(record, handler), registerCommand: (command) => registerCommand(record, command), registerContextEngine: (id, factory) => { - if (id === defaultSlotIdForKey("contextEngine")) { + const normalizedId = normalizeOptionalString(id) ?? ""; + if (!normalizedId) { pushDiagnostic({ level: "error", pluginId: record.id, source: record.source, - message: `context engine id reserved by core: ${id}`, + message: "context engine registration missing id", }); return; } - const result = registerContextEngineForOwner(id, factory, `plugin:${record.id}`, { - allowSameOwnerRefresh: true, - }); + if (typeof factory !== "function") { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: `context engine "${normalizedId}" registration missing factory`, + }); + return; + } + if (normalizedId === defaultSlotIdForKey("contextEngine")) { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: `context engine id reserved by core: ${normalizedId}`, + }); + return; + } + const result = registerContextEngineForOwner( + normalizedId, + factory, + `plugin:${record.id}`, + { + allowSameOwnerRefresh: true, + }, + ); if (!result.ok) { pushDiagnostic({ level: "error", pluginId: record.id, source: record.source, - message: `context engine already registered: ${id} (${result.existingOwner})`, + message: `context engine already registered: ${normalizedId} (${result.existingOwner})`, }); return; } - if (!record.contextEngineIds?.includes(id)) { - record.contextEngineIds = [...(record.contextEngineIds ?? []), id]; + if (!record.contextEngineIds?.includes(normalizedId)) { + record.contextEngineIds = [...(record.contextEngineIds ?? []), normalizedId]; } }, registerCompactionProvider: ( provider: Parameters[0], ) => { - const existing = getRegisteredCompactionProvider(provider.id); + const id = normalizeOptionalString( + ( + provider as Partial< + Parameters[0] + > | null + )?.id, + ); + if (!id) { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: "compaction provider registration missing id", + }); + return; + } + if (typeof provider?.summarize !== "function") { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: `compaction provider "${id}" registration missing summarize`, + }); + return; + } + const existing = getRegisteredCompactionProvider(id); if (existing) { const ownerDetail = existing.ownerPluginId ? ` (owner: ${existing.ownerPluginId})` @@ -2092,7 +2150,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { level: "error", pluginId: record.id, source: record.source, - message: `compaction provider already registered: ${provider.id}${ownerDetail}`, + message: `compaction provider already registered: ${id}${ownerDetail}`, }); return; } @@ -2185,6 +2243,15 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerMemoryPromptSection(builder); }, registerMemoryPromptSupplement: (builder) => { + if (typeof builder !== "function") { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: "memory prompt supplement registration missing builder", + }); + return; + } registerMemoryPromptSupplement(record.id, builder); }, registerMemoryCorpusSupplement: (supplement) => {