mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-19 00:14:46 +00:00
fix(gateway): harden startup restart queue (#82660) (thanks @samzong)
This commit is contained in:
@@ -37,6 +37,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Codex app-server: mark native context compaction completion events as successful, preventing false "Compaction incomplete" notices after successful Codex-managed compaction. Fixes #82470. (#81593) Thanks @Kyzcreig.
|
||||
- Codex app-server: keep long-running turns alive while current-turn approvals, user input, dynamic tools, and notifications make progress, and carry that progress into the outer run timeout. (#82601) Thanks @100yenadmin.
|
||||
- Gateway/channels: hand off traced channel account startup outside the startup diagnostic phase so long-lived channel tasks do not keep liveness warnings pinned to channel startup. Refs #82398.
|
||||
- Gateway/restart: queue restart and shutdown signals received while the gateway startup loop is still returning its server handle, so startup-time restarts are not dropped during update churn. (#82660) Thanks @samzong.
|
||||
- GitHub Copilot: route device-login requests through the plugin SSRF guard with a GitHub-only policy.
|
||||
- Group/channel replies: keep message-tool-preferred final replies private when the agent misses the message tool, and log suppressed payload metadata in the gateway debug log for quieter diagnosis.
|
||||
- Gateway/WebChat: route image attachments through a configured vision-capable `imageModel` plan before inlining images, and carry that image-model fallback chain through runtime retries. (#82524) Thanks @frankekn.
|
||||
|
||||
@@ -635,6 +635,10 @@ describe("runGatewayLoop", () => {
|
||||
() => markGatewaySigusr1RestartHandled.mock.calls.length > 0,
|
||||
"expected SIGUSR1 handler to consume the restart before startup returned",
|
||||
);
|
||||
await waitForLoopCondition(
|
||||
() => markGatewayDraining.mock.calls.length > 0,
|
||||
"expected queued startup restart to mark gateway draining before startup returned",
|
||||
);
|
||||
return { close: closeFirst };
|
||||
});
|
||||
start.mockImplementationOnce(async () => {
|
||||
@@ -675,6 +679,51 @@ describe("runGatewayLoop", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("exits if a queued startup restart never reaches a close handle", async () => {
|
||||
vi.clearAllMocks();
|
||||
peekGatewaySigusr1RestartReason.mockReturnValue(undefined);
|
||||
vi.useFakeTimers();
|
||||
|
||||
try {
|
||||
await withIsolatedSignals(async ({ captureSignal }) => {
|
||||
const close = vi.fn(async () => {});
|
||||
const startupNeverReturns = new Promise<void>(() => {});
|
||||
const { runtime, exited } = createRuntimeWithExitSignal();
|
||||
const start = vi.fn(async () => {
|
||||
await startupNeverReturns;
|
||||
return { close };
|
||||
});
|
||||
|
||||
const { runGatewayLoop } = await import("./run-loop.js");
|
||||
void runGatewayLoop({
|
||||
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
|
||||
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
const sigusr1 = captureSignal("SIGUSR1");
|
||||
|
||||
sigusr1();
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(1);
|
||||
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.exit).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(24_999);
|
||||
expect(runtime.exit).not.toHaveBeenCalled();
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
await expect(exited).resolves.toBe(1);
|
||||
expect(close).not.toHaveBeenCalled();
|
||||
expect(start).toHaveBeenCalledTimes(1);
|
||||
expect(gatewayLog.error).toHaveBeenCalledWith(
|
||||
"startup restart request timed out before gateway returned a close handle; exiting for supervisor recovery",
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("processes SIGINT immediately before startup returns a server", async () => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
@@ -736,7 +785,7 @@ describe("runGatewayLoop", () => {
|
||||
|
||||
await expect(exited).resolves.toBe(0);
|
||||
expect(close).not.toHaveBeenCalled();
|
||||
expect(markGatewayDraining).not.toHaveBeenCalled();
|
||||
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
|
||||
expect(start).toHaveBeenCalledTimes(1);
|
||||
expect(acquireGatewayLock).toHaveBeenCalledTimes(1);
|
||||
expect(gatewayLog.info).toHaveBeenCalledWith(
|
||||
|
||||
@@ -112,6 +112,8 @@ export async function runGatewayLoop(params: {
|
||||
// The HTTP server can report ready before params.start returns its close handle.
|
||||
// Defer lifecycle signals from that window until the loop can close and advance.
|
||||
let pendingStartupRequest: GatewayRunSignalRequest | null = null;
|
||||
let pendingStartupForceExitTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let restartDrainingMarkPromise: Promise<void> | null = null;
|
||||
let startupFailedWithoutServerHandle = false;
|
||||
const processInstanceId = randomUUID();
|
||||
const waitForHealthyChild = params.waitForHealthyChild ?? waitForHealthyGatewayChild;
|
||||
@@ -305,6 +307,32 @@ export async function runGatewayLoop(params: {
|
||||
|
||||
const SUPERVISOR_STOP_TIMEOUT_MS = 30_000;
|
||||
const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000;
|
||||
const clearPendingStartupForceExitTimer = () => {
|
||||
if (!pendingStartupForceExitTimer) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(pendingStartupForceExitTimer);
|
||||
pendingStartupForceExitTimer = null;
|
||||
};
|
||||
const armPendingStartupForceExitTimer = () => {
|
||||
if (pendingStartupForceExitTimer) {
|
||||
return;
|
||||
}
|
||||
pendingStartupForceExitTimer = setTimeout(() => {
|
||||
pendingStartupForceExitTimer = null;
|
||||
gatewayLog.error(
|
||||
"startup restart request timed out before gateway returned a close handle; exiting for supervisor recovery",
|
||||
);
|
||||
void (async () => {
|
||||
try {
|
||||
await writeStabilityBundle("gateway.restart_startup_request_timeout");
|
||||
} finally {
|
||||
exitProcess(1);
|
||||
}
|
||||
})();
|
||||
}, SHUTDOWN_TIMEOUT_MS);
|
||||
pendingStartupForceExitTimer.unref?.();
|
||||
};
|
||||
const resolveRestartDrainTimeoutMs = async (
|
||||
restartIntent?: RestartIntentOptions,
|
||||
): Promise<RestartDrainTimeoutMs> => {
|
||||
@@ -323,6 +351,18 @@ export async function runGatewayLoop(params: {
|
||||
return DEFAULT_RESTART_DRAIN_TIMEOUT_MS;
|
||||
}
|
||||
};
|
||||
const markRestartDraining = async () => {
|
||||
if (!restartDrainingMarkPromise) {
|
||||
restartDrainingMarkPromise = (async () => {
|
||||
const { markGatewayDraining } = await loadGatewayLifecycleRuntimeModule();
|
||||
markGatewayDraining();
|
||||
})().catch((err) => {
|
||||
restartDrainingMarkPromise = null;
|
||||
throw err;
|
||||
});
|
||||
}
|
||||
await restartDrainingMarkPromise;
|
||||
};
|
||||
|
||||
const runAcceptedRequest = ({
|
||||
action,
|
||||
@@ -404,7 +444,6 @@ export async function runGatewayLoop(params: {
|
||||
getInspectableActiveTaskRestartBlockers,
|
||||
getActiveEmbeddedRunCount,
|
||||
getActiveTaskCount,
|
||||
markGatewayDraining,
|
||||
waitForActiveEmbeddedRuns,
|
||||
waitForActiveTasks,
|
||||
} = await loadGatewayLifecycleRuntimeModule();
|
||||
@@ -439,7 +478,7 @@ export async function runGatewayLoop(params: {
|
||||
|
||||
// Reject new enqueues immediately during the drain window so
|
||||
// sessions get an explicit restart error instead of silent task loss.
|
||||
markGatewayDraining();
|
||||
await markRestartDraining();
|
||||
const activeTasks = getActiveTaskCount();
|
||||
const activeRuns = getActiveEmbeddedRunCount();
|
||||
activeTasksAtDrainStart = activeTasks;
|
||||
@@ -540,6 +579,7 @@ export async function runGatewayLoop(params: {
|
||||
}
|
||||
const request = pendingStartupRequest;
|
||||
pendingStartupRequest = null;
|
||||
clearPendingStartupForceExitTimer();
|
||||
startupFailedWithoutServerHandle = false;
|
||||
runAcceptedRequest(request);
|
||||
};
|
||||
@@ -554,6 +594,7 @@ export async function runGatewayLoop(params: {
|
||||
if (action === "stop" && pendingStartupRequest && !server) {
|
||||
gatewayLog.info(`received ${signal}; overriding pending startup restart with shutdown`);
|
||||
pendingStartupRequest = null;
|
||||
clearPendingStartupForceExitTimer();
|
||||
startupFailedWithoutServerHandle = false;
|
||||
runAcceptedRequest(acceptedRequest);
|
||||
return;
|
||||
@@ -583,6 +624,10 @@ export async function runGatewayLoop(params: {
|
||||
}
|
||||
if (!server || !restartResolver) {
|
||||
pendingStartupRequest = acceptedRequest;
|
||||
void markRestartDraining().catch((err) => {
|
||||
gatewayLog.warn(`failed to mark gateway draining for startup restart: ${String(err)}`);
|
||||
});
|
||||
armPendingStartupForceExitTimer();
|
||||
return;
|
||||
}
|
||||
runAcceptedRequest(acceptedRequest);
|
||||
@@ -668,6 +713,7 @@ export async function runGatewayLoop(params: {
|
||||
let isFirstStart = true;
|
||||
for (;;) {
|
||||
await onIteration();
|
||||
restartDrainingMarkPromise = null;
|
||||
let startupFailedBeforeServerHandle = false;
|
||||
try {
|
||||
server = await params.start({ startupStartedAt });
|
||||
|
||||
Reference in New Issue
Block a user