mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:00:54 +00:00
fix: preserve restart hooks during async prep (#70269)
This commit is contained in:
@@ -173,6 +173,51 @@ describe("infra runtime", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("keeps restart requests coalesced while preparation is in flight", async () => {
|
||||||
|
let releaseFirstPrep: () => void = () => {};
|
||||||
|
const firstRollback = vi.fn(async () => {});
|
||||||
|
const firstBeforeEmit = vi.fn(
|
||||||
|
() =>
|
||||||
|
new Promise<void>((resolve) => {
|
||||||
|
releaseFirstPrep = resolve;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const latestBeforeEmit = vi.fn(async () => {});
|
||||||
|
const emitSpy = vi.spyOn(process, "emit");
|
||||||
|
const handler = () => {};
|
||||||
|
process.on("SIGUSR1", handler);
|
||||||
|
try {
|
||||||
|
scheduleGatewaySigusr1Restart({
|
||||||
|
delayMs: 1_000,
|
||||||
|
reason: "first",
|
||||||
|
emitHooks: {
|
||||||
|
beforeEmit: firstBeforeEmit,
|
||||||
|
afterEmitRejected: firstRollback,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1_000);
|
||||||
|
expect(firstBeforeEmit).toHaveBeenCalledTimes(1);
|
||||||
|
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
|
||||||
|
|
||||||
|
const second = scheduleGatewaySigusr1Restart({
|
||||||
|
delayMs: 1_000,
|
||||||
|
reason: "second",
|
||||||
|
emitHooks: { beforeEmit: latestBeforeEmit },
|
||||||
|
});
|
||||||
|
expect(second.coalesced).toBe(true);
|
||||||
|
|
||||||
|
releaseFirstPrep();
|
||||||
|
await vi.advanceTimersByTimeAsync(0);
|
||||||
|
|
||||||
|
expect(firstRollback).toHaveBeenCalledTimes(1);
|
||||||
|
expect(latestBeforeEmit).toHaveBeenCalledTimes(1);
|
||||||
|
expect(emitSpy).toHaveBeenCalledWith("SIGUSR1");
|
||||||
|
} finally {
|
||||||
|
process.removeListener("SIGUSR1", handler);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it("rolls back prepared restart state when emission is rejected", async () => {
|
it("rolls back prepared restart state when emission is rejected", async () => {
|
||||||
const beforeEmit = vi.fn(async () => {});
|
const beforeEmit = vi.fn(async () => {});
|
||||||
const afterEmitRejected = vi.fn(async () => {});
|
const afterEmitRejected = vi.fn(async () => {});
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ let pendingRestartTimer: ReturnType<typeof setTimeout> | null = null;
|
|||||||
let pendingRestartDueAt = 0;
|
let pendingRestartDueAt = 0;
|
||||||
let pendingRestartReason: string | undefined;
|
let pendingRestartReason: string | undefined;
|
||||||
let pendingRestartEmitHooks: RestartEmitHooks | undefined;
|
let pendingRestartEmitHooks: RestartEmitHooks | undefined;
|
||||||
|
let pendingRestartPreparing = false;
|
||||||
const activeDeferralPolls = new Set<ReturnType<typeof setInterval>>();
|
const activeDeferralPolls = new Set<ReturnType<typeof setInterval>>();
|
||||||
|
|
||||||
function hasUnconsumedRestartSignal(): boolean {
|
function hasUnconsumedRestartSignal(): boolean {
|
||||||
@@ -51,6 +52,7 @@ function clearPendingScheduledRestart(): void {
|
|||||||
pendingRestartDueAt = 0;
|
pendingRestartDueAt = 0;
|
||||||
pendingRestartReason = undefined;
|
pendingRestartReason = undefined;
|
||||||
pendingRestartEmitHooks = undefined;
|
pendingRestartEmitHooks = undefined;
|
||||||
|
pendingRestartPreparing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
function clearActiveDeferralPolls(): void {
|
function clearActiveDeferralPolls(): void {
|
||||||
@@ -211,15 +213,34 @@ function updatePendingRestartEmitHooks(hooks?: RestartEmitHooks): void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function emitPreparedGatewayRestart(hooks?: RestartEmitHooks): Promise<void> {
|
async function emitPreparedGatewayRestart(hooks?: RestartEmitHooks): Promise<void> {
|
||||||
try {
|
let nextHooks = hooks ?? pendingRestartEmitHooks;
|
||||||
await hooks?.beforeEmit?.();
|
if (!hooks) {
|
||||||
} catch (err) {
|
pendingRestartEmitHooks = undefined;
|
||||||
restartLog.warn(`restart preparation failed; restart will continue without it: ${String(err)}`);
|
}
|
||||||
|
let preparedHooks: RestartEmitHooks | undefined;
|
||||||
|
while (nextHooks) {
|
||||||
|
if (preparedHooks) {
|
||||||
|
await preparedHooks.afterEmitRejected?.().catch(() => undefined);
|
||||||
|
preparedHooks = undefined;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await nextHooks.beforeEmit?.();
|
||||||
|
preparedHooks = nextHooks;
|
||||||
|
} catch (err) {
|
||||||
|
restartLog.warn(
|
||||||
|
`restart preparation failed; restart will continue without it: ${String(err)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (hooks) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
nextHooks = pendingRestartEmitHooks;
|
||||||
|
pendingRestartEmitHooks = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
const emitted = emitGatewayRestart();
|
const emitted = emitGatewayRestart();
|
||||||
if (!emitted) {
|
if (!emitted) {
|
||||||
await hooks?.afterEmitRejected?.().catch(() => undefined);
|
await preparedHooks?.afterEmitRejected?.().catch(() => undefined);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -478,9 +499,9 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pendingRestartTimer) {
|
if (pendingRestartTimer || pendingRestartPreparing) {
|
||||||
const remainingMs = Math.max(0, pendingRestartDueAt - nowMs);
|
const remainingMs = pendingRestartPreparing ? 0 : Math.max(0, pendingRestartDueAt - nowMs);
|
||||||
const shouldPullEarlier = requestedDueAt < pendingRestartDueAt;
|
const shouldPullEarlier = !pendingRestartPreparing && requestedDueAt < pendingRestartDueAt;
|
||||||
if (shouldPullEarlier) {
|
if (shouldPullEarlier) {
|
||||||
restartLog.warn(
|
restartLog.warn(
|
||||||
`restart request rescheduled earlier reason=${reason ?? "unspecified"} pendingReason=${pendingRestartReason ?? "unspecified"} oldDelayMs=${remainingMs} newDelayMs=${Math.max(0, requestedDueAt - nowMs)} ${formatRestartAudit(opts?.audit)}`,
|
`restart request rescheduled earlier reason=${reason ?? "unspecified"} pendingReason=${pendingRestartReason ?? "unspecified"} oldDelayMs=${remainingMs} newDelayMs=${Math.max(0, requestedDueAt - nowMs)} ${formatRestartAudit(opts?.audit)}`,
|
||||||
@@ -512,18 +533,16 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
|||||||
pendingRestartTimer = null;
|
pendingRestartTimer = null;
|
||||||
pendingRestartDueAt = 0;
|
pendingRestartDueAt = 0;
|
||||||
pendingRestartReason = undefined;
|
pendingRestartReason = undefined;
|
||||||
const emitHooks = pendingRestartEmitHooks;
|
pendingRestartPreparing = true;
|
||||||
pendingRestartEmitHooks = undefined;
|
|
||||||
const pendingCheck = preRestartCheck;
|
const pendingCheck = preRestartCheck;
|
||||||
if (!pendingCheck) {
|
if (!pendingCheck) {
|
||||||
void emitPreparedGatewayRestart(emitHooks);
|
void emitPreparedGatewayRestart();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const cfg = getRuntimeConfig();
|
const cfg = getRuntimeConfig();
|
||||||
deferGatewayRestartUntilIdle({
|
deferGatewayRestartUntilIdle({
|
||||||
getPendingCount: pendingCheck,
|
getPendingCount: pendingCheck,
|
||||||
maxWaitMs: cfg.gateway?.reload?.deferralTimeoutMs,
|
maxWaitMs: cfg.gateway?.reload?.deferralTimeoutMs,
|
||||||
emitHooks,
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
Math.max(0, requestedDueAt - nowMs),
|
Math.max(0, requestedDueAt - nowMs),
|
||||||
|
|||||||
Reference in New Issue
Block a user