fix(gateway): cap channel startup fanout

This commit is contained in:
Peter Steinberger
2026-05-02 15:26:25 +01:00
parent 829364f85e
commit bf2711b40e
7 changed files with 177 additions and 21 deletions

View File

@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Control UI/sessions: bound the default Sessions tab query to recent activity and fewer rows, avoiding expensive full-history loads while keeping filters editable. Fixes #76050. (#76051) Thanks @Neomail2.
- Gateway/channels: cap startup fanout at four channel/account handoffs and recover from Bonjour ciao self-probe races, reducing Windows startup stalls with many Telegram accounts. Fixes #75687.
- CLI/update: treat inherited Gateway service markers as origin hints and only block package replacement when the managed Gateway is still live, so self-updates can stop the service and continue safely. (#75729) Thanks @hxy91819.
- Agents/failover: exempt run-level timeouts that fire during tool execution from model fallback, timeout-triggered compaction, and generic timeout payload synthesis. Long `process(poll)`, browser, or `exec` tool calls that exceed `agents.defaults.timeoutSeconds` previously rotated auth profiles, switched to a fallback model, and surfaced a misleading "LLM request timed out" error even though the primary model had already responded. Mirrors the existing `timedOutDuringCompaction` precedent (#46889). Fixes #52147. (#75873) Thanks @simonusa.

View File

@@ -420,6 +420,18 @@ describe("gateway bonjour advertiser", () => {
expect.stringContaining("suppressing ciao netmask assertion"),
);
logger.warn.mockClear();
expect(
handler?.(
new Error(
"Can't probe for a service which is announced already. Received announcing for service OpenClaw Gateway._openclaw._tcp.local.",
),
),
).toBe(true);
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining("suppressing ciao self-probe race"),
);
await started.stop();
});

View File

@@ -400,7 +400,11 @@ export async function startGatewayBonjourAdvertiser(
);
} else {
const label =
classification.kind === "netmask-assertion" ? "netmask assertion" : "interface assertion";
classification.kind === "netmask-assertion"
? "netmask assertion"
: classification.kind === "self-probe"
? "self-probe race"
: "interface assertion";
logger.warn(`bonjour: suppressing ciao ${label}: ${classification.formatted}`);
requestCiaoRecovery?.(classification);
}

View File

@@ -49,6 +49,20 @@ describe("bonjour-ciao", () => {
});
});
it("classifies ciao self-probe races separately from side effects", () => {
expect(
classifyCiaoUnhandledRejection(
new Error(
"Can't probe for a service which is announced already. Received announcing for service OpenClaw Gateway._openclaw._tcp.local.",
),
),
).toEqual({
kind: "self-probe",
formatted:
"Can't probe for a service which is announced already. Received announcing for service OpenClaw Gateway._openclaw._tcp.local.",
});
});
it("suppresses ciao announcement cancellation rejections", () => {
expect(ignoreCiaoUnhandledRejection(new Error("Ciao announcement cancelled by shutdown"))).toBe(
true,

View File

@@ -5,6 +5,8 @@ const CIAO_INTERFACE_ASSERTION_MESSAGE_RE =
/REACHED ILLEGAL STATE!?\s+IPV4 ADDRESS CHANGED? FROM (?:DEFINED TO UNDEFINED|UNDEFINED TO DEFINED)!?/u;
const CIAO_NETMASK_ASSERTION_MESSAGE_RE =
/IP ADDRESS VERSION MUST MATCH\.\s+NETMASK CANNOT HAVE A VERSION DIFFERENT FROM THE ADDRESS!?/u;
const CIAO_SELF_PROBE_MESSAGE_RE =
/CAN'T PROBE FOR A SERVICE WHICH IS ANNOUNCED ALREADY\.\s+RECEIVED (?:PROBING|ANNOUNCING|ANNOUNCED) FOR SERVICE\b/u;
// Restricted sandboxes (NemoClaw, Docker-in-Docker, k3s with locked-down policy)
// can refuse os.networkInterfaces(), which ciao calls during NetworkManager init.
// Node surfaces this as a SystemError mentioning the libuv syscall by name.
@@ -14,6 +16,7 @@ export type CiaoProcessErrorClassification =
| { kind: "cancellation"; formatted: string }
| { kind: "interface-assertion"; formatted: string }
| { kind: "netmask-assertion"; formatted: string }
| { kind: "self-probe"; formatted: string }
| { kind: "interface-enumeration-failure"; formatted: string };
function collectCiaoProcessErrorCandidates(reason: unknown): unknown[] {
@@ -69,6 +72,9 @@ export function classifyCiaoProcessError(reason: unknown): CiaoProcessErrorClass
if (CIAO_NETMASK_ASSERTION_MESSAGE_RE.test(message)) {
return { kind: "netmask-assertion", formatted };
}
if (CIAO_SELF_PROBE_MESSAGE_RE.test(message)) {
return { kind: "self-probe", formatted };
}
if (CIAO_INTERFACE_ENUMERATION_FAILURE_RE.test(message)) {
return { kind: "interface-enumeration-failure", formatted };
}

View File

@@ -104,6 +104,26 @@ function createDeferred(): { promise: Promise<void>; resolve: () => void } {
return { promise, resolve: resolvePromise };
}
async function flushMicrotasks(times = 8): Promise<void> {
for (let i = 0; i < times; i += 1) {
await Promise.resolve();
}
}
async function waitForMicrotaskCondition(
check: () => boolean,
message: string,
attempts = 100,
): Promise<void> {
for (let i = 0; i < attempts; i += 1) {
if (check()) {
return;
}
await Promise.resolve();
}
throw new Error(message);
}
function installTestRegistry(
...plugins: Array<
ChannelPlugin<TestAccount> | { plugin: ChannelPlugin<TestAccount>; origin: string }
@@ -618,6 +638,106 @@ describe("server-channels auto restart", () => {
);
});
it("limits whole-channel account startup fanout to four", async () => {
const accountIds = ["one", "two", "three", "four", "five", "six"];
const releases: Array<() => void> = [];
let active = 0;
let maxActive = 0;
const isConfigured = vi.fn(async () => {
active += 1;
maxActive = Math.max(maxActive, active);
await new Promise<void>((resolve) => {
releases.push(resolve);
});
active -= 1;
return true;
});
const startAccount = vi.fn(
async ({ abortSignal }: { abortSignal: AbortSignal }) =>
await new Promise<void>((resolve) => {
abortSignal.addEventListener("abort", () => resolve(), { once: true });
}),
);
installTestRegistry(
createTestPlugin({
listAccountIds: () => accountIds,
isConfigured,
startAccount,
}),
);
const manager = createManager();
const start = manager.startChannel("discord");
await flushMicrotasks();
expect(isConfigured).toHaveBeenCalledTimes(4);
expect(maxActive).toBe(4);
expect(startAccount).not.toHaveBeenCalled();
releases.splice(0, 4).forEach((release) => release());
await waitForMicrotaskCondition(
() => isConfigured.mock.calls.length === 6,
"expected second account startup wave",
);
expect(isConfigured).toHaveBeenCalledTimes(6);
expect(maxActive).toBe(4);
releases.splice(0).forEach((release) => release());
await start;
expect(startAccount).toHaveBeenCalledTimes(6);
await manager.stopChannel("discord");
});
it("limits channel plugin startup fanout to four", async () => {
const channelIds = Array.from({ length: 6 }, (_, index) => `test-${index}` as ChannelId);
const releases: Array<() => void> = [];
let active = 0;
let maxActive = 0;
const plugins = channelIds.map((id, index) =>
createTestPlugin({
id,
order: index,
isConfigured: async () => {
active += 1;
maxActive = Math.max(maxActive, active);
await new Promise<void>((resolve) => {
releases.push(resolve);
});
active -= 1;
return true;
},
startAccount: async ({ abortSignal }) =>
await new Promise<void>((resolve) => {
abortSignal.addEventListener("abort", () => resolve(), { once: true });
}),
}),
);
installTestRegistry(...plugins);
const manager = createManager({ channelIds });
const start = manager.startChannels();
await flushMicrotasks();
expect(releases).toHaveLength(4);
expect(maxActive).toBe(4);
releases.splice(0, 4).forEach((release) => release());
await waitForMicrotaskCondition(
() => releases.length === 2,
"expected second channel startup wave",
);
expect(releases).toHaveLength(2);
expect(maxActive).toBe(4);
releases.splice(0).forEach((release) => release());
await start;
await Promise.all(channelIds.map((id) => manager.stopChannel(id)));
});
it("evicts stale account lifecycle state during whole-channel reload", async () => {
let accountIds = [DEFAULT_ACCOUNT_ID];
const startAccount = vi.fn(

View File

@@ -25,6 +25,7 @@ import {
normalizeOptionalAccountId,
} from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js";
import type { ChannelRuntimeSnapshot } from "./server-channel-runtime.types.js";
export type { ChannelRuntimeSnapshot };
@@ -36,6 +37,7 @@ const CHANNEL_RESTART_POLICY: BackoffPolicy = {
};
const MAX_RESTART_ATTEMPTS = 10;
const CHANNEL_STOP_ABORT_TIMEOUT_MS = 5_000;
const CHANNEL_STARTUP_CONCURRENCY = 4;
type ChannelRuntimeStore = {
aborts: Map<string, AbortController>;
@@ -376,8 +378,9 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
return;
}
await Promise.all(
accountIds.map(async (id) => {
const startup = await runTasksWithConcurrency({
limit: CHANNEL_STARTUP_CONCURRENCY,
tasks: accountIds.map((id) => async () => {
if (store.tasks.has(id)) {
return;
}
@@ -617,7 +620,10 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
}
}
}),
);
});
if (startup.hasError) {
throw startup.firstError;
}
};
const startChannel = async (channelId: ChannelId, accountId?: string) => {
@@ -696,25 +702,18 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
};
const startChannels = async () => {
const pending = [...listChannelPlugins()];
const workerCount = Math.min(8, pending.length);
await Promise.all(
Array.from({ length: workerCount }, async () => {
for (;;) {
const plugin = pending.shift();
if (!plugin) {
return;
}
try {
await measureStartup(`channels.${plugin.id}.start`, () => startChannel(plugin.id));
} catch (err) {
ensureChannelLog(plugin.id).error?.(
`[${plugin.id}] channel startup failed: ${formatErrorMessage(err)}`,
);
}
await runTasksWithConcurrency({
limit: CHANNEL_STARTUP_CONCURRENCY,
tasks: [...listChannelPlugins()].map((plugin) => async () => {
try {
await measureStartup(`channels.${plugin.id}.start`, () => startChannel(plugin.id));
} catch (err) {
ensureChannelLog(plugin.id).error?.(
`[${plugin.id}] channel startup failed: ${formatErrorMessage(err)}`,
);
}
}),
);
});
};
const markChannelLoggedOut = (channelId: ChannelId, cleared: boolean, accountId?: string) => {