fix(gateway): yield after agent accepted ack

This commit is contained in:
Vincent Koc
2026-04-28 23:41:22 -07:00
parent d95719d7c1
commit 1d61862adb
7 changed files with 246 additions and 16 deletions

View File

@@ -118,6 +118,11 @@ const expectIncludes = (listValue, expected, field) => {
throw new Error(`${field} missing ${expected}: ${JSON.stringify(listValue)}`);
}
};
const expectMissing = (listValue, expected, field) => {
if (Array.isArray(listValue) && listValue.includes(expected)) {
throw new Error(`${field} unexpectedly included ${expected}: ${JSON.stringify(listValue)}`);
}
};
function assertRealPathInside(parentPath, childPath, label) {
const parentRealPath = fs.realpathSync(parentPath);
@@ -221,11 +226,11 @@ function assertInstalled() {
webFetchProviderIds: ["kitchen-sink-web-fetch-provider", "web fetch providers"],
webSearchProviderIds: ["kitchen-sink-web-search-provider", "web search providers"],
migrationProviderIds: ["kitchen-sink-migration-provider", "migration providers"],
agentHarnessIds: ["kitchen-sink-agent-harness", "agent harnesses"],
};
for (const [field, [id, label]] of Object.entries(pluginSurfaceIds)) {
expectIncludes(inspect.plugin?.[field], id, label);
}
expectMissing(inspect.plugin?.agentHarnessIds, "kitchen-sink-agent-harness", "agent harnesses");
expectIncludes(inspect.services, "kitchen-sink-service", "services");
expectIncludes(inspect.commands, "kitchen-sink-command", "commands");
expectIncludes(toolNames, "kitchen-sink-tool", "tools");
@@ -241,11 +246,15 @@ function assertInstalled() {
const expectedErrorMessages = new Set([
"only bundled plugins can register agent tool result middleware",
'agent harness "kitchen-sink-agent-harness" registration missing required runtime methods',
'channel "kitchen-sink-channel-probe" registration missing required config helpers',
"cli registration missing explicit commands metadata",
"only bundled plugins can register Codex app-server extension factories",
'compaction provider "kitchen-sink-compaction-provider" registration missing summarize',
"context engine registration missing id",
"http route registration missing or invalid auth: /kitchen-sink/http-route",
"plugin must own memory slot or declare contracts.memoryEmbeddingProviders for adapter: kitchen-sink-memory-embedding-provider",
"memory prompt supplement registration missing builder",
]);
for (const message of errorMessages) {
if (!expectedErrorMessages.has(message)) {

View File

@@ -2491,6 +2491,42 @@ describe("gateway agent handler chat.abort integration", () => {
expect((entry?.expiresAtMs ?? 0) - (entry?.startedAtMs ?? 0)).toBeGreaterThan(24 * 60 * 60_000);
});
it("yields after the accepted ack before dispatching heavy agent work", async () => {
prime();
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const respond = vi.fn();
const runId = "idem-yield-before-dispatch";
const pending = invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ respond, reqId: runId },
);
await Promise.resolve();
await Promise.resolve();
expect(respond).toHaveBeenCalledWith(
true,
expect.objectContaining({
runId,
status: "accepted",
}),
undefined,
{ runId },
);
expect(mocks.agentCommand).not.toHaveBeenCalled();
await new Promise((resolve) => setImmediate(resolve));
await pending;
expect(mocks.agentCommand).toHaveBeenCalledTimes(1);
});
it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => {
prime();
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));

View File

@@ -1,4 +1,5 @@
import { randomUUID } from "node:crypto";
import { MessageChannel } from "node:worker_threads";
import {
listAgentIds,
resolveDefaultAgentId,
@@ -392,6 +393,18 @@ function dispatchAgentRunFromGateway(params: {
});
}
function yieldAfterAgentAcceptedAck(): Promise<void> {
return new Promise((resolve) => {
const channel = new MessageChannel();
channel.port1.on("message", () => {
channel.port1.close();
channel.port2.close();
resolve();
});
channel.port2.postMessage(undefined);
});
}
export const agentHandlers: GatewayRequestHandlers = {
agent: async ({ params, respond, context, client, isWebchatConnect }) => {
const p = params;
@@ -1116,6 +1129,10 @@ export const agentHandlers: GatewayRequestHandlers = {
},
});
respond(true, accepted, undefined, { runId });
// Give the accepted frame one event-loop turn to flush before the runner
// starts potentially heavy synchronous prompt/context setup. Otherwise a
// hot pre-turn path can starve the WebSocket caller until it times out.
await yieldAfterAgentAcceptedAck();
let dispatched = false;
try {

View File

@@ -25,6 +25,7 @@ import { withEnv } from "../test-utils/env.js";
import { resolveBundledRuntimeDependencyInstallRootPlan } from "./bundled-runtime-deps.js";
import { clearPluginCommands } from "./command-registry-state.js";
import { getPluginCommandSpecs } from "./command-specs.js";
import { listCompactionProviderIds } from "./compaction-provider.js";
import {
getGlobalHookRunner,
getGlobalPluginRegistry,
@@ -72,6 +73,7 @@ import {
getMemoryRuntime,
listActiveMemoryPublicArtifacts,
listMemoryCorpusSupplements,
listMemoryPromptSupplements,
registerMemoryCorpusSupplement,
registerMemoryFlushPlanResolver,
registerMemoryPromptSupplement,
@@ -3386,6 +3388,44 @@ module.exports = { id: "throws-after-import", register() {} };`,
expect(listAgentHarnessIds()).toEqual([]);
});
it("rejects malformed plugin agent harness registrations", () => {
useNoBundledPlugins();
const plugin = writePlugin({
id: "bad-harness",
filename: "bad-harness.cjs",
body: `module.exports = {
id: "bad-harness",
register(api) {
api.registerAgentHarness({
id: "broken",
label: "Broken",
});
},
};`,
});
const registry = loadOpenClawPlugins({
cache: false,
workspaceDir: plugin.dir,
config: {
plugins: {
load: { paths: [plugin.file] },
allow: ["bad-harness"],
},
},
onlyPluginIds: ["bad-harness"],
});
expect(listAgentHarnessIds()).toEqual([]);
expect(registry.diagnostics).toContainEqual(
expect.objectContaining({
level: "error",
pluginId: "bad-harness",
message: 'agent harness "broken" registration missing required runtime methods',
}),
);
});
it("does not register internal hooks globally during non-activating loads", () => {
useNoBundledPlugins();
const plugin = writePlugin({
@@ -5120,6 +5160,21 @@ module.exports = { id: "throws-after-import", register() {} };`,
).toBe("Demo Duplicate");
},
},
{
label: "rejects malformed plugin context engine registration",
pluginId: "context-engine-malformed",
body: `module.exports = { id: "context-engine-malformed", register(api) {
api.registerContextEngine({ id: "broken-context" });
} };`,
assert: (registry: ReturnType<typeof loadOpenClawPlugins>) => {
expectRegistryErrorDiagnostic({
registry,
pluginId: "context-engine-malformed",
message: "context engine registration missing id",
});
expect(listContextEngineIds()).not.toContain("broken-context");
},
},
{
label: "rejects plugin context engine ids reserved by core",
pluginId: "context-engine-core-collision",
@@ -5134,6 +5189,36 @@ module.exports = { id: "throws-after-import", register() {} };`,
});
},
},
{
label: "rejects malformed compaction provider registration",
pluginId: "compaction-provider-malformed",
body: `module.exports = { id: "compaction-provider-malformed", register(api) {
api.registerCompactionProvider({ id: "broken-compaction", label: "Broken" });
} };`,
assert: (registry: ReturnType<typeof loadOpenClawPlugins>) => {
expectRegistryErrorDiagnostic({
registry,
pluginId: "compaction-provider-malformed",
message: 'compaction provider "broken-compaction" registration missing summarize',
});
expect(listCompactionProviderIds()).not.toContain("broken-compaction");
},
},
{
label: "rejects malformed memory prompt supplement registration",
pluginId: "memory-prompt-supplement-malformed",
body: `module.exports = { id: "memory-prompt-supplement-malformed", register(api) {
api.registerMemoryPromptSupplement({ id: "broken-memory-prompt" });
} };`,
assert: (registry: ReturnType<typeof loadOpenClawPlugins>) => {
expectRegistryErrorDiagnostic({
registry,
pluginId: "memory-prompt-supplement-malformed",
message: "memory prompt supplement registration missing builder",
});
expect(listMemoryPromptSupplements()).toEqual([]);
},
},
{
label: "requires plugin CLI registrars to declare explicit command roots",
pluginId: "cli-missing-metadata",

View File

@@ -206,6 +206,14 @@ describe("memory plugin state", () => {
]);
});
it("ignores malformed prompt builder output", () => {
registerMemoryPromptSection(() => ["primary", 1, undefined] as never);
registerMemoryPromptSupplement("async-helper", () => Promise.resolve(["async"]) as never);
registerMemoryPromptSupplement("valid-helper", () => ["valid", false] as never);
expect(buildMemoryPromptSection({ availableTools: new Set() })).toEqual(["primary", "valid"]);
});
it("stores memory corpus supplements", async () => {
const supplement = {
search: async () => [{ corpus: "wiki", path: "sources/alpha.md", score: 1, snippet: "x" }],

View File

@@ -208,17 +208,25 @@ export function buildMemoryPromptSection(params: {
availableTools: Set<string>;
citationsMode?: MemoryCitationsMode;
}): string[] {
const primary =
const primary = normalizeMemoryPromptLines(
memoryPluginState.capability?.capability.promptBuilder?.(params) ??
memoryPluginState.promptBuilder?.(params) ??
[];
memoryPluginState.promptBuilder?.(params) ??
[],
);
const supplements = memoryPluginState.promptSupplements
// Keep supplement order stable even if plugin registration order changes.
.toSorted((left, right) => left.pluginId.localeCompare(right.pluginId))
.flatMap((registration) => registration.builder(params));
.flatMap((registration) => normalizeMemoryPromptLines(registration.builder(params)));
return [...primary, ...supplements];
}
function normalizeMemoryPromptLines(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return value.filter((line): line is string => typeof line === "string");
}
export function getMemoryPromptSectionBuilder(): MemoryPromptSectionBuilder | undefined {
return memoryPluginState.capability?.capability.promptBuilder ?? memoryPluginState.promptBuilder;
}

View File

@@ -805,7 +805,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
};
const registerAgentHarness = (record: PluginRecord, harness: AgentHarness) => {
const id = harness.id.trim();
const id = normalizeOptionalString((harness as Partial<AgentHarness> | undefined)?.id) ?? "";
if (!id) {
pushDiagnostic({
level: "error",
@@ -815,6 +815,15 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
});
return;
}
if (typeof harness.supports !== "function" || typeof harness.runAttempt !== "function") {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `agent harness "${id}" registration missing required runtime methods`,
});
return;
}
const existing =
registryParams.activateGlobalSideEffects === false
? registry.agentHarnesses.find((entry) => entry.harness.id === id)
@@ -2055,35 +2064,84 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
registerConversationBindingResolvedHandler(record, handler),
registerCommand: (command) => registerCommand(record, command),
registerContextEngine: (id, factory) => {
if (id === defaultSlotIdForKey("contextEngine")) {
const normalizedId = normalizeOptionalString(id) ?? "";
if (!normalizedId) {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `context engine id reserved by core: ${id}`,
message: "context engine registration missing id",
});
return;
}
const result = registerContextEngineForOwner(id, factory, `plugin:${record.id}`, {
allowSameOwnerRefresh: true,
});
if (typeof factory !== "function") {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `context engine "${normalizedId}" registration missing factory`,
});
return;
}
if (normalizedId === defaultSlotIdForKey("contextEngine")) {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `context engine id reserved by core: ${normalizedId}`,
});
return;
}
const result = registerContextEngineForOwner(
normalizedId,
factory,
`plugin:${record.id}`,
{
allowSameOwnerRefresh: true,
},
);
if (!result.ok) {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `context engine already registered: ${id} (${result.existingOwner})`,
message: `context engine already registered: ${normalizedId} (${result.existingOwner})`,
});
return;
}
if (!record.contextEngineIds?.includes(id)) {
record.contextEngineIds = [...(record.contextEngineIds ?? []), id];
if (!record.contextEngineIds?.includes(normalizedId)) {
record.contextEngineIds = [...(record.contextEngineIds ?? []), normalizedId];
}
},
registerCompactionProvider: (
provider: Parameters<OpenClawPluginApi["registerCompactionProvider"]>[0],
) => {
const existing = getRegisteredCompactionProvider(provider.id);
const id = normalizeOptionalString(
(
provider as Partial<
Parameters<OpenClawPluginApi["registerCompactionProvider"]>[0]
> | null
)?.id,
);
if (!id) {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: "compaction provider registration missing id",
});
return;
}
if (typeof provider?.summarize !== "function") {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `compaction provider "${id}" registration missing summarize`,
});
return;
}
const existing = getRegisteredCompactionProvider(id);
if (existing) {
const ownerDetail = existing.ownerPluginId
? ` (owner: ${existing.ownerPluginId})`
@@ -2092,7 +2150,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
level: "error",
pluginId: record.id,
source: record.source,
message: `compaction provider already registered: ${provider.id}${ownerDetail}`,
message: `compaction provider already registered: ${id}${ownerDetail}`,
});
return;
}
@@ -2185,6 +2243,15 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
registerMemoryPromptSection(builder);
},
registerMemoryPromptSupplement: (builder) => {
if (typeof builder !== "function") {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: "memory prompt supplement registration missing builder",
});
return;
}
registerMemoryPromptSupplement(record.id, builder);
},
registerMemoryCorpusSupplement: (supplement) => {