fix(gateway): cancel post-ready maintenance on close

Fixes the post-ready maintenance shutdown race by marking close before gateway_stop hooks, clearing delayed timers, and suppressing already-fired maintenance work during shutdown.

Verification:
- pnpm test:serial src/gateway/server-runtime-services.test.ts src/gateway/server-import-boundary.test.ts
- pnpm exec oxfmt --check --threads=1 src/gateway/server.impl.ts src/gateway/server-import-boundary.test.ts src/gateway/server-runtime-services.ts src/gateway/server-runtime-services.test.ts
- git diff --check
- crabbox blacksmith-testbox tbx_01kqrw87d527jwcfxbp6qk1wc3: pnpm check:changed (exit 0)
This commit is contained in:
Vincent Koc
2026-05-03 23:56:56 -07:00
committed by GitHub
parent 8a8a12559d
commit f4f98f45c7
4 changed files with 201 additions and 24 deletions

View File

@@ -50,4 +50,15 @@ describe("gateway startup import boundaries", () => {
expect(validation).not.toContain("legacy-secretref-env-marker");
expect(validation).not.toContain("commands/doctor");
});
it("marks gateway close before awaiting gateway_stop hooks", () => {
const serverImpl = readSource("src/gateway/server.impl.ts");
const closeStart = serverImpl.indexOf("close: async (opts)");
const hookStart = serverImpl.indexOf("runGlobalGatewayStopSafely", closeStart);
const markStart = serverImpl.indexOf("markClosePreludeStarted();", closeStart);
expect(closeStart).toBeGreaterThan(-1);
expect(markStart).toBeGreaterThan(closeStart);
expect(markStart).toBeLessThan(hookStart);
});
});

View File

@@ -55,6 +55,7 @@ vi.mock("./model-pricing-cache.js", () => ({
const {
activateGatewayScheduledServices,
runGatewayPostReadyMaintenance,
scheduleGatewayPostReadyMaintenance,
startGatewayRuntimeServices,
} = await import("./server-runtime-services.js");
@@ -245,6 +246,64 @@ describe("server-runtime-services", () => {
expect(recordPostReadyMemory).toHaveBeenCalledTimes(1);
});
it("returns a cancellable post-ready maintenance timer", async () => {
vi.useFakeTimers();
const startMaintenance = vi.fn(async () => null);
const onStarted = vi.fn();
const handle = scheduleGatewayPostReadyMaintenance(
createPostReadyMaintenanceScheduleParams({
delayMs: 25,
onStarted,
startMaintenance,
}),
);
clearTimeout(handle);
await vi.advanceTimersByTimeAsync(25);
expect(onStarted).not.toHaveBeenCalled();
expect(startMaintenance).not.toHaveBeenCalled();
});
it("clears delayed maintenance handles when close starts during maintenance startup", async () => {
vi.useFakeTimers();
let closing = false;
let resolveMaintenance!: (maintenance: ReturnType<typeof createMaintenanceHandles>) => void;
const startMaintenance = vi.fn(
() =>
new Promise<ReturnType<typeof createMaintenanceHandles>>((resolve) => {
resolveMaintenance = resolve;
}),
);
const applyMaintenance = vi.fn();
const cron = { start: vi.fn(async () => undefined) };
const recordPostReadyMemory = vi.fn();
scheduleGatewayPostReadyMaintenance(
createPostReadyMaintenanceScheduleParams({
delayMs: 25,
isClosing: () => closing,
startMaintenance,
applyMaintenance,
cron,
recordPostReadyMemory,
}),
);
await vi.advanceTimersByTimeAsync(25);
expect(startMaintenance).toHaveBeenCalledTimes(1);
closing = true;
resolveMaintenance(createMaintenanceHandles());
await Promise.resolve();
await Promise.resolve();
expect(applyMaintenance).not.toHaveBeenCalled();
expect(cron.start).not.toHaveBeenCalled();
expect(recordPostReadyMemory).not.toHaveBeenCalled();
expect(vi.getTimerCount()).toBe(0);
});
it("keeps scheduled services disabled for minimal test gateways", () => {
const cron = { start: vi.fn(async () => undefined) };
@@ -279,3 +338,30 @@ function createLog() {
error: vi.fn(),
};
}
function createPostReadyMaintenanceScheduleParams(
overrides: Partial<Parameters<typeof scheduleGatewayPostReadyMaintenance>[0]> = {},
): Parameters<typeof scheduleGatewayPostReadyMaintenance>[0] {
return {
delayMs: 1,
isClosing: () => false,
startMaintenance: vi.fn(async () => null),
applyMaintenance: vi.fn(),
shouldStartCron: () => true,
markCronStartHandled: vi.fn(),
cron: { start: vi.fn(async () => undefined) },
logCron: { error: vi.fn() },
log: createLog(),
recordPostReadyMemory: vi.fn(),
...overrides,
};
}
function createMaintenanceHandles() {
return {
tickInterval: setInterval(() => undefined, 60_000),
healthInterval: setInterval(() => undefined, 60_000),
dedupeCleanup: setInterval(() => undefined, 60_000),
mediaCleanup: setInterval(() => undefined, 60_000),
};
}

View File

@@ -18,7 +18,7 @@ type GatewayRuntimeServiceLogger = {
type GatewayPostReadyLogger = {
warn: (message: string) => void;
};
type GatewayMaintenanceHandles = NonNullable<
export type GatewayMaintenanceHandles = NonNullable<
Awaited<ReturnType<typeof startGatewayMaintenanceTimers>>
>;
@@ -60,6 +60,18 @@ export function startGatewayCronWithLogging(params: {
void params.cron.start().catch((err) => params.logCron.error(`failed to start: ${String(err)}`));
}
function clearGatewayMaintenanceHandles(maintenance: GatewayMaintenanceHandles | null): void {
if (!maintenance) {
return;
}
clearInterval(maintenance.tickInterval);
clearInterval(maintenance.healthInterval);
clearInterval(maintenance.dedupeCleanup);
if (maintenance.mediaCleanup) {
clearInterval(maintenance.mediaCleanup);
}
}
export async function runGatewayPostReadyMaintenance(params: {
startMaintenance: () => Promise<GatewayMaintenanceHandles | null>;
applyMaintenance: (maintenance: GatewayMaintenanceHandles) => void;
@@ -88,6 +100,59 @@ export async function runGatewayPostReadyMaintenance(params: {
params.recordPostReadyMemory();
}
export function scheduleGatewayPostReadyMaintenance(params: {
delayMs: number;
isClosing: () => boolean;
onStarted?: () => void;
startMaintenance: () => Promise<GatewayMaintenanceHandles | null>;
applyMaintenance: (maintenance: GatewayMaintenanceHandles) => void;
shouldStartCron: () => boolean;
markCronStartHandled: () => void;
cron: { start: () => Promise<void> };
logCron: { error: (message: string) => void };
log: GatewayPostReadyLogger;
recordPostReadyMemory: () => void;
}): ReturnType<typeof setTimeout> {
const timer = setTimeout(() => {
params.onStarted?.();
if (params.isClosing()) {
return;
}
void runGatewayPostReadyMaintenance({
startMaintenance: async () => {
if (params.isClosing()) {
return null;
}
const maintenance = await params.startMaintenance();
if (params.isClosing()) {
clearGatewayMaintenanceHandles(maintenance);
return null;
}
return maintenance;
},
applyMaintenance: (maintenance) => {
if (params.isClosing()) {
clearGatewayMaintenanceHandles(maintenance);
return;
}
params.applyMaintenance(maintenance);
},
shouldStartCron: () => !params.isClosing() && params.shouldStartCron(),
markCronStartHandled: params.markCronStartHandled,
cron: params.cron,
logCron: params.logCron,
log: params.log,
recordPostReadyMemory: () => {
if (!params.isClosing()) {
params.recordPostReadyMemory();
}
},
});
}, params.delayMs);
timer.unref?.();
return timer;
}
function recoverPendingOutboundDeliveries(params: {
cfg: OpenClawConfig;
log: GatewayRuntimeServiceLogger;

View File

@@ -894,8 +894,20 @@ export async function startGatewayServer(
deps.cron = runtimeState.cronState.cron;
let closePreludeStarted = false;
const runClosePrelude = async () => {
let postReadyMaintenanceTimer: ReturnType<typeof setTimeout> | null = null;
const clearPostReadyMaintenanceTimer = () => {
if (!postReadyMaintenanceTimer) {
return;
}
clearTimeout(postReadyMaintenanceTimer);
postReadyMaintenanceTimer = null;
};
const markClosePreludeStarted = () => {
closePreludeStarted = true;
clearPostReadyMaintenanceTimer();
};
const runClosePrelude = async () => {
markClosePreludeStarted();
clearCurrentPluginMetadataSnapshot();
const { runGatewayClosePrelude } = await loadGatewayCloseModule();
await runGatewayClosePrelude({
@@ -1492,28 +1504,30 @@ export async function startGatewayServer(
log.warn(`gateway: failed to promote config last-known-good backup: ${String(err)}`);
});
if (!minimalTestGateway) {
const handle = setTimeout(() => {
void gatewayRuntimeServices.runGatewayPostReadyMaintenance({
startMaintenance: earlyRuntime.startMaintenance,
applyMaintenance: (maintenance) => {
runtimeState.tickInterval = maintenance.tickInterval;
runtimeState.healthInterval = maintenance.healthInterval;
runtimeState.dedupeCleanup = maintenance.dedupeCleanup;
runtimeState.mediaCleanup = maintenance.mediaCleanup;
},
shouldStartCron: () => !gatewayCronStartHandled,
markCronStartHandled: () => {
gatewayCronStartHandled = true;
},
cron: runtimeState.cronState.cron,
logCron,
log,
recordPostReadyMemory: () => {
startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb());
},
});
}, POST_READY_MAINTENANCE_DELAY_MS);
handle.unref?.();
postReadyMaintenanceTimer = gatewayRuntimeServices.scheduleGatewayPostReadyMaintenance({
delayMs: POST_READY_MAINTENANCE_DELAY_MS,
isClosing: () => closePreludeStarted,
onStarted: () => {
postReadyMaintenanceTimer = null;
},
startMaintenance: earlyRuntime.startMaintenance,
applyMaintenance: (maintenance) => {
runtimeState.tickInterval = maintenance.tickInterval;
runtimeState.healthInterval = maintenance.healthInterval;
runtimeState.dedupeCleanup = maintenance.dedupeCleanup;
runtimeState.mediaCleanup = maintenance.mediaCleanup;
},
shouldStartCron: () => !gatewayCronStartHandled,
markCronStartHandled: () => {
gatewayCronStartHandled = true;
},
cron: runtimeState.cronState.cron,
logCron,
log,
recordPostReadyMemory: () => {
startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb());
},
});
} else {
startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb());
}
@@ -1527,6 +1541,7 @@ export async function startGatewayServer(
return {
close: async (opts) => {
try {
markClosePreludeStarted();
// Run gateway_stop plugin hook before shutdown
const { runGlobalGatewayStopSafely } = await import("../plugins/hook-runner-global.js");
await runGlobalGatewayStopSafely({