mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-09 12:42:55 +00:00
fix(gateway): clear fallback context on close
Fixes gateway fallback request context cleanup on close/startup failure and shards the full gateway Vitest lane to avoid the observed memory hang.\n\nValidation:\n- Testbox: OPENCLAW_TESTBOX=1 pnpm check:changed\n- Testbox: env OPENCLAW_VITEST_MAX_WORKERS=1 /usr/bin/time -v pnpm test:gateway (254 files, 2950 tests, max RSS 4144692 KB)
This commit is contained in:
28
src/gateway/server-plugins.lifecycle.test.ts
Normal file
28
src/gateway/server-plugins.lifecycle.test.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { clearFallbackGatewayContext, createGatewaySubagentRuntime } from "./server-plugins.js";
|
||||
import { installGatewayTestHooks, startServer } from "./test-helpers.server.js";
|
||||
|
||||
installGatewayTestHooks();
|
||||
|
||||
afterEach(() => {
|
||||
clearFallbackGatewayContext();
|
||||
});
|
||||
|
||||
describe("gateway plugin fallback context lifecycle", () => {
|
||||
it("clears the fallback gateway context after server close", async () => {
|
||||
const runtime = createGatewaySubagentRuntime();
|
||||
const started = await startServer();
|
||||
|
||||
try {
|
||||
await expect(
|
||||
runtime.getSessionMessages({ sessionKey: "agent:main:main", limit: 1 }),
|
||||
).resolves.toEqual({ messages: [] });
|
||||
} finally {
|
||||
await started.server.close({ reason: "fallback context lifecycle test done" });
|
||||
}
|
||||
|
||||
await expect(
|
||||
runtime.getSessionMessages({ sessionKey: "agent:main:main", limit: 1 }),
|
||||
).rejects.toThrow("No scope set and no fallback context available");
|
||||
});
|
||||
});
|
||||
@@ -344,6 +344,7 @@ beforeEach(() => {
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
serverPluginsModule.clearFallbackGatewayContext();
|
||||
runtimeModule.clearGatewaySubagentRuntime();
|
||||
runtimeRegistryModule.resetPluginRuntimeStateForTest();
|
||||
});
|
||||
@@ -1388,4 +1389,52 @@ describe("loadGatewayPlugins", () => {
|
||||
await runtime.run({ sessionKey: "s-5", message: "prefer resolver" });
|
||||
expect(getLastDispatchedContext()).toBe(freshContext);
|
||||
});
|
||||
|
||||
test("clears fallback context snapshots when a resolver is registered", async () => {
|
||||
const serverPlugins = serverPluginsModule;
|
||||
const runtime = await createSubagentRuntime(serverPlugins);
|
||||
const staleContext = createTestContext("stale-snapshot");
|
||||
|
||||
serverPlugins.setFallbackGatewayContext(staleContext);
|
||||
serverPlugins.setFallbackGatewayContextResolver(() => undefined);
|
||||
|
||||
await expect(runtime.run({ sessionKey: "s-6", message: "stale fallback" })).rejects.toThrow(
|
||||
"No scope set and no fallback context available",
|
||||
);
|
||||
});
|
||||
|
||||
test("clears fallback context and resolver state", async () => {
|
||||
const serverPlugins = serverPluginsModule;
|
||||
const runtime = await createSubagentRuntime(serverPlugins);
|
||||
const context = createTestContext("clear-context");
|
||||
|
||||
serverPlugins.setFallbackGatewayContextResolver(() => context);
|
||||
await runtime.run({ sessionKey: "s-7", message: "before clear" });
|
||||
expect(getLastDispatchedContext()).toBe(context);
|
||||
|
||||
serverPlugins.clearFallbackGatewayContext();
|
||||
|
||||
await expect(runtime.run({ sessionKey: "s-7", message: "after clear" })).rejects.toThrow(
|
||||
"No scope set and no fallback context available",
|
||||
);
|
||||
});
|
||||
|
||||
test("resolver cleanup only clears the resolver it registered", async () => {
|
||||
const serverPlugins = serverPluginsModule;
|
||||
const runtime = await createSubagentRuntime(serverPlugins);
|
||||
const firstContext = createTestContext("first-owner");
|
||||
const secondContext = createTestContext("second-owner");
|
||||
|
||||
const clearFirst = serverPlugins.setFallbackGatewayContextResolver(() => firstContext);
|
||||
const clearSecond = serverPlugins.setFallbackGatewayContextResolver(() => secondContext);
|
||||
|
||||
clearFirst();
|
||||
await runtime.run({ sessionKey: "s-8", message: "after first cleanup" });
|
||||
expect(getLastDispatchedContext()).toBe(secondContext);
|
||||
|
||||
clearSecond();
|
||||
await expect(
|
||||
runtime.run({ sessionKey: "s-8", message: "after second cleanup" }),
|
||||
).rejects.toThrow("No scope set and no fallback context available");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -44,17 +44,40 @@ const getFallbackGatewayContextState = () =>
|
||||
resolveContext: undefined,
|
||||
}));
|
||||
|
||||
export function setFallbackGatewayContext(ctx: GatewayRequestContext): void {
|
||||
export function setFallbackGatewayContext(ctx: GatewayRequestContext): () => void {
|
||||
const fallbackGatewayContextState = getFallbackGatewayContextState();
|
||||
fallbackGatewayContextState.context = ctx;
|
||||
fallbackGatewayContextState.resolveContext = undefined;
|
||||
return () => {
|
||||
const currentFallbackGatewayContextState = getFallbackGatewayContextState();
|
||||
if (
|
||||
currentFallbackGatewayContextState.context === ctx &&
|
||||
currentFallbackGatewayContextState.resolveContext === undefined
|
||||
) {
|
||||
currentFallbackGatewayContextState.context = undefined;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function setFallbackGatewayContextResolver(
|
||||
resolveContext: () => GatewayRequestContext | undefined,
|
||||
): void {
|
||||
): () => void {
|
||||
const fallbackGatewayContextState = getFallbackGatewayContextState();
|
||||
fallbackGatewayContextState.context = undefined;
|
||||
fallbackGatewayContextState.resolveContext = resolveContext;
|
||||
return () => {
|
||||
const currentFallbackGatewayContextState = getFallbackGatewayContextState();
|
||||
if (currentFallbackGatewayContextState.resolveContext === resolveContext) {
|
||||
currentFallbackGatewayContextState.context = undefined;
|
||||
currentFallbackGatewayContextState.resolveContext = undefined;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function clearFallbackGatewayContext(): void {
|
||||
const fallbackGatewayContextState = getFallbackGatewayContextState();
|
||||
fallbackGatewayContextState.context = undefined;
|
||||
fallbackGatewayContextState.resolveContext = undefined;
|
||||
}
|
||||
|
||||
function getFallbackGatewayContext(): GatewayRequestContext | undefined {
|
||||
|
||||
@@ -121,6 +121,9 @@ const mocks = vi.hoisted(() => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.unmock("./server-restart-sentinel.js");
|
||||
vi.resetModules();
|
||||
|
||||
vi.mock("../agents/agent-scope.js", () => ({
|
||||
resolveSessionAgentId: mocks.resolveSessionAgentId,
|
||||
}));
|
||||
|
||||
@@ -726,9 +726,14 @@ export async function startGatewayServer(
|
||||
httpServers,
|
||||
})(opts);
|
||||
};
|
||||
let clearFallbackGatewayContextForServer = () => {};
|
||||
const closeOnStartupFailure = async () => {
|
||||
await runClosePrelude();
|
||||
await createCloseHandler()({ reason: "gateway startup failed" });
|
||||
try {
|
||||
await runClosePrelude();
|
||||
await createCloseHandler()({ reason: "gateway startup failed" });
|
||||
} finally {
|
||||
clearFallbackGatewayContextForServer();
|
||||
}
|
||||
};
|
||||
const broadcastVoiceWakeRoutingChanged = (config: VoiceWakeRoutingConfig) => {
|
||||
broadcast("voicewake.routing.changed", { config }, { dropIfSlow: true });
|
||||
@@ -888,7 +893,15 @@ export async function startGatewayServer(
|
||||
broadcastVoiceWakeRoutingChanged,
|
||||
});
|
||||
|
||||
setFallbackGatewayContextResolver(() => gatewayRequestContext);
|
||||
const fallbackGatewayContextCleanup: unknown = setFallbackGatewayContextResolver(
|
||||
() => gatewayRequestContext,
|
||||
);
|
||||
clearFallbackGatewayContextForServer =
|
||||
typeof fallbackGatewayContextCleanup === "function"
|
||||
? () => {
|
||||
fallbackGatewayContextCleanup();
|
||||
}
|
||||
: () => {};
|
||||
|
||||
if (!minimalTestGateway) {
|
||||
if (deferredConfiguredChannelPluginIds.length > 0) {
|
||||
@@ -1039,14 +1052,18 @@ export async function startGatewayServer(
|
||||
|
||||
return {
|
||||
close: async (opts) => {
|
||||
// Run gateway_stop plugin hook before shutdown
|
||||
await runGlobalGatewayStopSafely({
|
||||
event: { reason: opts?.reason ?? "gateway stopping" },
|
||||
ctx: { port },
|
||||
onError: (err) => log.warn(`gateway_stop hook failed: ${String(err)}`),
|
||||
});
|
||||
await runClosePrelude();
|
||||
await close(opts);
|
||||
try {
|
||||
// Run gateway_stop plugin hook before shutdown
|
||||
await runGlobalGatewayStopSafely({
|
||||
event: { reason: opts?.reason ?? "gateway stopping" },
|
||||
ctx: { port },
|
||||
onError: (err) => log.warn(`gateway_stop hook failed: ${String(err)}`),
|
||||
});
|
||||
await runClosePrelude();
|
||||
await close(opts);
|
||||
} finally {
|
||||
clearFallbackGatewayContextForServer();
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -237,7 +237,7 @@ describe("test scripts", () => {
|
||||
expect(pkg.scripts?.["test"]).toBe("node scripts/test-projects.mjs");
|
||||
expect(pkg.scripts?.["test:force"]).toBe("node --import tsx scripts/test-force.ts");
|
||||
expect(pkg.scripts?.["test:gateway"]).toBe(
|
||||
"node scripts/run-vitest.mjs run --config test/vitest/vitest.gateway.config.ts",
|
||||
"OPENCLAW_GATEWAY_PROJECT_SHARDS=1 node scripts/run-vitest.mjs run --config test/vitest/vitest.gateway.config.ts",
|
||||
);
|
||||
expect(pkg.scripts?.["test:single"]).toBeUndefined();
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user