mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-17 07:00:49 +00:00
refactor(qa): add shared QA channel contract and harden worker startup (#64562)
* refactor(qa): add shared transport contract and suite migration * refactor(qa): harden worker gateway startup * fix(qa): scope waits and sanitize shutdown artifacts * fix(qa): confine artifacts and redact preserved logs * fix(qa): block symlink escapes in artifact paths * fix(gateway): clear shutdown race timers * fix(qa): harden shutdown cleanup paths * fix(qa): sanitize gateway logs in thrown errors * fix(qa): harden suite startup and artifact paths * fix(qa): stage bundled plugins from mutated config * fix(qa): broaden gateway log bearer redaction * fix(qa-channel): restore runtime export * fix(qa): stop failed gateway startups as a process tree * fix(qa-channel): load runtime hook from api surface
This commit is contained in:
@@ -193,6 +193,68 @@ describe("server-channels auto restart", () => {
|
||||
expect(startAccount).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("consumes rejected stop tasks during manual abort", async () => {
|
||||
const unhandledRejection = vi.fn();
|
||||
process.on("unhandledRejection", unhandledRejection);
|
||||
try {
|
||||
const startAccount = vi.fn(
|
||||
async ({ abortSignal }: { abortSignal: AbortSignal }) =>
|
||||
await new Promise<void>((_resolve, reject) => {
|
||||
abortSignal.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
reject(new Error("aborted"));
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
}),
|
||||
);
|
||||
installTestRegistry(
|
||||
createTestPlugin({
|
||||
startAccount,
|
||||
}),
|
||||
);
|
||||
const manager = createManager();
|
||||
|
||||
await manager.startChannels();
|
||||
vi.runAllTicks();
|
||||
await manager.stopChannel("discord", DEFAULT_ACCOUNT_ID);
|
||||
await Promise.resolve();
|
||||
|
||||
expect(unhandledRejection).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
process.off("unhandledRejection", unhandledRejection);
|
||||
}
|
||||
});
|
||||
|
||||
it("does not allow a second account task to start when stop times out", async () => {
|
||||
const startAccount = vi.fn(
|
||||
async ({ abortSignal }: { abortSignal: AbortSignal }) =>
|
||||
await new Promise<void>(() => {
|
||||
abortSignal.addEventListener("abort", () => {}, { once: true });
|
||||
}),
|
||||
);
|
||||
installTestRegistry(
|
||||
createTestPlugin({
|
||||
startAccount,
|
||||
}),
|
||||
);
|
||||
const manager = createManager();
|
||||
|
||||
await manager.startChannels();
|
||||
const stopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID);
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
await stopTask;
|
||||
await manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
|
||||
|
||||
const snapshot = manager.getRuntimeSnapshot();
|
||||
const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID];
|
||||
expect(startAccount).toHaveBeenCalledTimes(1);
|
||||
expect(account?.running).toBe(true);
|
||||
expect(account?.restartPending).toBe(false);
|
||||
expect(account?.lastError).toContain("channel stop timed out");
|
||||
});
|
||||
|
||||
it("marks enabled/configured when account descriptors omit them", () => {
|
||||
installTestRegistry(
|
||||
createTestPlugin({
|
||||
|
||||
@@ -26,6 +26,7 @@ const CHANNEL_RESTART_POLICY: BackoffPolicy = {
|
||||
jitter: 0.1,
|
||||
};
|
||||
const MAX_RESTART_ATTEMPTS = 10;
|
||||
const CHANNEL_STOP_ABORT_TIMEOUT_MS = 5_000;
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
|
||||
@@ -72,6 +73,31 @@ function cloneDefaultRuntime(channelId: ChannelId, accountId: string): ChannelAc
|
||||
return { ...resolveDefaultRuntime(channelId), accountId };
|
||||
}
|
||||
|
||||
async function waitForChannelStopGracefully(task: Promise<unknown> | undefined, timeoutMs: number) {
|
||||
if (!task) {
|
||||
return true;
|
||||
}
|
||||
return await new Promise<boolean>((resolve) => {
|
||||
let settled = false;
|
||||
const timer = setTimeout(() => {
|
||||
if (!settled) {
|
||||
settled = true;
|
||||
resolve(false);
|
||||
}
|
||||
}, timeoutMs);
|
||||
timer.unref?.();
|
||||
const resolveSettled = () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
resolve(true);
|
||||
};
|
||||
void task.then(resolveSettled, resolveSettled);
|
||||
});
|
||||
}
|
||||
|
||||
function applyDescribedAccountFields(
|
||||
next: ChannelAccountSnapshot,
|
||||
described: ChannelAccountSnapshot | undefined,
|
||||
@@ -527,6 +553,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
}
|
||||
manuallyStopped.add(restartKey(channelId, id));
|
||||
abort?.abort();
|
||||
const log = channelLogs[channelId];
|
||||
if (plugin?.gateway?.stopAccount) {
|
||||
const account = plugin.config.resolveAccount(cfg, id);
|
||||
await plugin.gateway.stopAccount({
|
||||
@@ -540,10 +567,21 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
setStatus: (next) => setRuntime(channelId, id, next),
|
||||
});
|
||||
}
|
||||
try {
|
||||
await task;
|
||||
} catch {
|
||||
// ignore
|
||||
const stoppedCleanly = await waitForChannelStopGracefully(
|
||||
task,
|
||||
CHANNEL_STOP_ABORT_TIMEOUT_MS,
|
||||
);
|
||||
if (!stoppedCleanly) {
|
||||
log.warn?.(
|
||||
`[${id}] channel stop exceeded ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms after abort; continuing shutdown`,
|
||||
);
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: true,
|
||||
restartPending: false,
|
||||
lastError: `channel stop timed out after ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
store.aborts.delete(id);
|
||||
store.tasks.delete(id);
|
||||
|
||||
@@ -6,6 +6,8 @@ const mocks = {
|
||||
};
|
||||
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
|
||||
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
|
||||
const HTTP_CLOSE_GRACE_MS = 1_000;
|
||||
const HTTP_CLOSE_FORCE_WAIT_MS = 5_000;
|
||||
|
||||
vi.mock("../channels/plugins/index.js", () => ({
|
||||
listChannelPlugins: () => [],
|
||||
@@ -133,6 +135,7 @@ describe("createGatewayCloseHandler", () => {
|
||||
await closePromise;
|
||||
|
||||
expect(terminate).toHaveBeenCalledTimes(1);
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
expect(
|
||||
mocks.logWarn.mock.calls.some(([message]) =>
|
||||
String(message).includes("websocket server close exceeded 1000ms"),
|
||||
@@ -156,10 +159,109 @@ describe("createGatewayCloseHandler", () => {
|
||||
await vi.advanceTimersByTimeAsync(WEBSOCKET_CLOSE_GRACE_MS + WEBSOCKET_CLOSE_FORCE_CONTINUE_MS);
|
||||
await closePromise;
|
||||
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
expect(
|
||||
mocks.logWarn.mock.calls.some(([message]) =>
|
||||
String(message).includes("websocket server close still pending after 250ms force window"),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("forces lingering HTTP connections closed when server close exceeds the grace window", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
let closeCallback: ((err?: Error | null) => void) | null = null;
|
||||
const closeAllConnections = vi.fn(() => {
|
||||
closeCallback?.(null);
|
||||
});
|
||||
const close = createGatewayCloseHandler({
|
||||
bonjourStop: null,
|
||||
tailscaleCleanup: null,
|
||||
canvasHost: null,
|
||||
canvasHostServer: null,
|
||||
stopChannel: vi.fn(async () => undefined),
|
||||
pluginServices: null,
|
||||
cron: { stop: vi.fn() },
|
||||
heartbeatRunner: { stop: vi.fn() } as never,
|
||||
updateCheckStop: null,
|
||||
stopTaskRegistryMaintenance: null,
|
||||
nodePresenceTimers: new Map(),
|
||||
broadcast: vi.fn(),
|
||||
tickInterval: setInterval(() => undefined, 60_000),
|
||||
healthInterval: setInterval(() => undefined, 60_000),
|
||||
dedupeCleanup: setInterval(() => undefined, 60_000),
|
||||
mediaCleanup: null,
|
||||
agentUnsub: null,
|
||||
heartbeatUnsub: null,
|
||||
transcriptUnsub: null,
|
||||
lifecycleUnsub: null,
|
||||
chatRunState: { clear: vi.fn() },
|
||||
clients: new Set(),
|
||||
configReloader: { stop: vi.fn(async () => undefined) },
|
||||
wss: { close: (cb: () => void) => cb() } as never,
|
||||
httpServer: {
|
||||
close: (cb: (err?: Error | null) => void) => {
|
||||
closeCallback = cb;
|
||||
},
|
||||
closeAllConnections,
|
||||
closeIdleConnections: vi.fn(),
|
||||
} as never,
|
||||
});
|
||||
|
||||
const closePromise = close({ reason: "test shutdown" });
|
||||
await vi.advanceTimersByTimeAsync(HTTP_CLOSE_GRACE_MS);
|
||||
await closePromise;
|
||||
|
||||
expect(closeAllConnections).toHaveBeenCalledTimes(1);
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
expect(
|
||||
mocks.logWarn.mock.calls.some(([message]) =>
|
||||
String(message).includes("http server close exceeded 1000ms"),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("fails shutdown when http server close still hangs after force close", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const close = createGatewayCloseHandler({
|
||||
bonjourStop: null,
|
||||
tailscaleCleanup: null,
|
||||
canvasHost: null,
|
||||
canvasHostServer: null,
|
||||
stopChannel: vi.fn(async () => undefined),
|
||||
pluginServices: null,
|
||||
cron: { stop: vi.fn() },
|
||||
heartbeatRunner: { stop: vi.fn() } as never,
|
||||
updateCheckStop: null,
|
||||
stopTaskRegistryMaintenance: null,
|
||||
nodePresenceTimers: new Map(),
|
||||
broadcast: vi.fn(),
|
||||
tickInterval: setInterval(() => undefined, 60_000),
|
||||
healthInterval: setInterval(() => undefined, 60_000),
|
||||
dedupeCleanup: setInterval(() => undefined, 60_000),
|
||||
mediaCleanup: null,
|
||||
agentUnsub: null,
|
||||
heartbeatUnsub: null,
|
||||
transcriptUnsub: null,
|
||||
lifecycleUnsub: null,
|
||||
chatRunState: { clear: vi.fn() },
|
||||
clients: new Set(),
|
||||
configReloader: { stop: vi.fn(async () => undefined) },
|
||||
wss: { close: (cb: () => void) => cb() } as never,
|
||||
httpServer: {
|
||||
close: () => undefined,
|
||||
closeAllConnections: vi.fn(),
|
||||
closeIdleConnections: vi.fn(),
|
||||
} as never,
|
||||
});
|
||||
|
||||
const closePromise = close({ reason: "test shutdown" });
|
||||
const closeExpectation = expect(closePromise).rejects.toThrow(
|
||||
"http server close still pending after forced connection shutdown (5000ms)",
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(HTTP_CLOSE_GRACE_MS + HTTP_CLOSE_FORCE_WAIT_MS);
|
||||
await closeExpectation;
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,6 +12,31 @@ import { normalizeOptionalString } from "../shared/string-coerce.js";
|
||||
const shutdownLog = createSubsystemLogger("gateway/shutdown");
|
||||
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
|
||||
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
|
||||
const HTTP_CLOSE_GRACE_MS = 1_000;
|
||||
const HTTP_CLOSE_FORCE_WAIT_MS = 5_000;
|
||||
|
||||
function createTimeoutRace<T>(timeoutMs: number, onTimeout: () => T) {
|
||||
let timer: ReturnType<typeof setTimeout> | null = setTimeout(() => {
|
||||
timer = null;
|
||||
resolve(onTimeout());
|
||||
}, timeoutMs);
|
||||
timer.unref?.();
|
||||
|
||||
let resolve!: (value: T) => void;
|
||||
const promise = new Promise<T>((innerResolve) => {
|
||||
resolve = innerResolve;
|
||||
});
|
||||
|
||||
return {
|
||||
promise,
|
||||
clear() {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function runGatewayClosePrelude(params: {
|
||||
stopDiagnostics?: () => void;
|
||||
@@ -170,10 +195,15 @@ export function createGatewayCloseHandler(params: {
|
||||
await params.configReloader.stop().catch(() => {});
|
||||
const wsClients = params.wss.clients ?? new Set();
|
||||
const closePromise = new Promise<void>((resolve) => params.wss.close(() => resolve()));
|
||||
const websocketGraceTimeout = createTimeoutRace(
|
||||
WEBSOCKET_CLOSE_GRACE_MS,
|
||||
() => false as const,
|
||||
);
|
||||
const closedWithinGrace = await Promise.race([
|
||||
closePromise.then(() => true),
|
||||
new Promise<false>((resolve) => setTimeout(() => resolve(false), WEBSOCKET_CLOSE_GRACE_MS)),
|
||||
websocketGraceTimeout.promise,
|
||||
]);
|
||||
websocketGraceTimeout.clear();
|
||||
if (!closedWithinGrace) {
|
||||
shutdownLog.warn(
|
||||
`websocket server close exceeded ${WEBSOCKET_CLOSE_GRACE_MS}ms; forcing shutdown continuation with ${wsClients.size} tracked client(s)`,
|
||||
@@ -185,17 +215,13 @@ export function createGatewayCloseHandler(params: {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
await Promise.race([
|
||||
closePromise,
|
||||
new Promise<void>((resolve) =>
|
||||
setTimeout(() => {
|
||||
shutdownLog.warn(
|
||||
`websocket server close still pending after ${WEBSOCKET_CLOSE_FORCE_CONTINUE_MS}ms force window; continuing shutdown`,
|
||||
);
|
||||
resolve();
|
||||
}, WEBSOCKET_CLOSE_FORCE_CONTINUE_MS),
|
||||
),
|
||||
]);
|
||||
const websocketForceTimeout = createTimeoutRace(WEBSOCKET_CLOSE_FORCE_CONTINUE_MS, () => {
|
||||
shutdownLog.warn(
|
||||
`websocket server close still pending after ${WEBSOCKET_CLOSE_FORCE_CONTINUE_MS}ms force window; continuing shutdown`,
|
||||
);
|
||||
});
|
||||
await Promise.race([closePromise, websocketForceTimeout.promise]);
|
||||
websocketForceTimeout.clear();
|
||||
}
|
||||
const servers =
|
||||
params.httpServers && params.httpServers.length > 0
|
||||
@@ -203,14 +229,41 @@ export function createGatewayCloseHandler(params: {
|
||||
: [params.httpServer];
|
||||
for (const server of servers) {
|
||||
const httpServer = server as HttpServer & {
|
||||
closeAllConnections?: () => void;
|
||||
closeIdleConnections?: () => void;
|
||||
};
|
||||
if (typeof httpServer.closeIdleConnections === "function") {
|
||||
httpServer.closeIdleConnections();
|
||||
}
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
const closePromise = new Promise<void>((resolve, reject) =>
|
||||
httpServer.close((err) => (err ? reject(err) : resolve())),
|
||||
);
|
||||
const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const);
|
||||
const closedWithinGrace = await Promise.race([
|
||||
closePromise.then(() => true),
|
||||
httpGraceTimeout.promise,
|
||||
]);
|
||||
httpGraceTimeout.clear();
|
||||
if (!closedWithinGrace) {
|
||||
shutdownLog.warn(
|
||||
`http server close exceeded ${HTTP_CLOSE_GRACE_MS}ms; forcing connection shutdown and waiting for close`,
|
||||
);
|
||||
httpServer.closeAllConnections?.();
|
||||
const httpForceTimeout = createTimeoutRace(
|
||||
HTTP_CLOSE_FORCE_WAIT_MS,
|
||||
() => false as const,
|
||||
);
|
||||
const closedAfterForce = await Promise.race([
|
||||
closePromise.then(() => true),
|
||||
httpForceTimeout.promise,
|
||||
]);
|
||||
httpForceTimeout.clear();
|
||||
if (!closedAfterForce) {
|
||||
throw new Error(
|
||||
`http server close still pending after forced connection shutdown (${HTTP_CLOSE_FORCE_WAIT_MS}ms)`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user