diff --git a/CHANGELOG.md b/CHANGELOG.md index 26053fc123e..7cb904e4e42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Control UI/sessions: bound the default Sessions tab query to recent activity and fewer rows, avoiding expensive full-history loads while keeping filters editable. Fixes #76050. (#76051) Thanks @Neomail2. +- Gateway/channels: cap startup fanout at four channel/account handoffs and recover from Bonjour ciao self-probe races, reducing Windows startup stalls with many Telegram accounts. Fixes #75687. - CLI/update: treat inherited Gateway service markers as origin hints and only block package replacement when the managed Gateway is still live, so self-updates can stop the service and continue safely. (#75729) Thanks @hxy91819. - Agents/failover: exempt run-level timeouts that fire during tool execution from model fallback, timeout-triggered compaction, and generic timeout payload synthesis. Long `process(poll)`, browser, or `exec` tool calls that exceed `agents.defaults.timeoutSeconds` previously rotated auth profiles, switched to a fallback model, and surfaced a misleading "LLM request timed out" error even though the primary model had already responded. Mirrors the existing `timedOutDuringCompaction` precedent (#46889). Fixes #52147. (#75873) Thanks @simonusa. diff --git a/extensions/bonjour/src/advertiser.test.ts b/extensions/bonjour/src/advertiser.test.ts index f296b3653ea..d5c9e732c9d 100644 --- a/extensions/bonjour/src/advertiser.test.ts +++ b/extensions/bonjour/src/advertiser.test.ts @@ -420,6 +420,18 @@ describe("gateway bonjour advertiser", () => { expect.stringContaining("suppressing ciao netmask assertion"), ); + logger.warn.mockClear(); + expect( + handler?.( + new Error( + "Can't probe for a service which is announced already. Received announcing for service OpenClaw Gateway._openclaw._tcp.local.", + ), + ), + ).toBe(true); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("suppressing ciao self-probe race"), + ); + await started.stop(); }); diff --git a/extensions/bonjour/src/advertiser.ts b/extensions/bonjour/src/advertiser.ts index 251ad65413a..7b48f40409e 100644 --- a/extensions/bonjour/src/advertiser.ts +++ b/extensions/bonjour/src/advertiser.ts @@ -400,7 +400,11 @@ export async function startGatewayBonjourAdvertiser( ); } else { const label = - classification.kind === "netmask-assertion" ? "netmask assertion" : "interface assertion"; + classification.kind === "netmask-assertion" + ? "netmask assertion" + : classification.kind === "self-probe" + ? "self-probe race" + : "interface assertion"; logger.warn(`bonjour: suppressing ciao ${label}: ${classification.formatted}`); requestCiaoRecovery?.(classification); } diff --git a/extensions/bonjour/src/ciao.test.ts b/extensions/bonjour/src/ciao.test.ts index 252b41399e1..5798efad94c 100644 --- a/extensions/bonjour/src/ciao.test.ts +++ b/extensions/bonjour/src/ciao.test.ts @@ -49,6 +49,20 @@ describe("bonjour-ciao", () => { }); }); + it("classifies ciao self-probe races separately from side effects", () => { + expect( + classifyCiaoUnhandledRejection( + new Error( + "Can't probe for a service which is announced already. Received announcing for service OpenClaw Gateway._openclaw._tcp.local.", + ), + ), + ).toEqual({ + kind: "self-probe", + formatted: + "Can't probe for a service which is announced already. Received announcing for service OpenClaw Gateway._openclaw._tcp.local.", + }); + }); + it("suppresses ciao announcement cancellation rejections", () => { expect(ignoreCiaoUnhandledRejection(new Error("Ciao announcement cancelled by shutdown"))).toBe( true, diff --git a/extensions/bonjour/src/ciao.ts b/extensions/bonjour/src/ciao.ts index da623b27856..c155278b0f9 100644 --- a/extensions/bonjour/src/ciao.ts +++ b/extensions/bonjour/src/ciao.ts @@ -5,6 +5,8 @@ const CIAO_INTERFACE_ASSERTION_MESSAGE_RE = /REACHED ILLEGAL STATE!?\s+IPV4 ADDRESS CHANGED? FROM (?:DEFINED TO UNDEFINED|UNDEFINED TO DEFINED)!?/u; const CIAO_NETMASK_ASSERTION_MESSAGE_RE = /IP ADDRESS VERSION MUST MATCH\.\s+NETMASK CANNOT HAVE A VERSION DIFFERENT FROM THE ADDRESS!?/u; +const CIAO_SELF_PROBE_MESSAGE_RE = + /CAN'T PROBE FOR A SERVICE WHICH IS ANNOUNCED ALREADY\.\s+RECEIVED (?:PROBING|ANNOUNCING|ANNOUNCED) FOR SERVICE\b/u; // Restricted sandboxes (NemoClaw, Docker-in-Docker, k3s with locked-down policy) // can refuse os.networkInterfaces(), which ciao calls during NetworkManager init. // Node surfaces this as a SystemError mentioning the libuv syscall by name. @@ -14,6 +16,7 @@ export type CiaoProcessErrorClassification = | { kind: "cancellation"; formatted: string } | { kind: "interface-assertion"; formatted: string } | { kind: "netmask-assertion"; formatted: string } + | { kind: "self-probe"; formatted: string } | { kind: "interface-enumeration-failure"; formatted: string }; function collectCiaoProcessErrorCandidates(reason: unknown): unknown[] { @@ -69,6 +72,9 @@ export function classifyCiaoProcessError(reason: unknown): CiaoProcessErrorClass if (CIAO_NETMASK_ASSERTION_MESSAGE_RE.test(message)) { return { kind: "netmask-assertion", formatted }; } + if (CIAO_SELF_PROBE_MESSAGE_RE.test(message)) { + return { kind: "self-probe", formatted }; + } if (CIAO_INTERFACE_ENUMERATION_FAILURE_RE.test(message)) { return { kind: "interface-enumeration-failure", formatted }; } diff --git a/src/gateway/server-channels.test.ts b/src/gateway/server-channels.test.ts index 2722c97afa6..9a6f0e34fc3 100644 --- a/src/gateway/server-channels.test.ts +++ b/src/gateway/server-channels.test.ts @@ -104,6 +104,26 @@ function createDeferred(): { promise: Promise; resolve: () => void } { return { promise, resolve: resolvePromise }; } +async function flushMicrotasks(times = 8): Promise { + for (let i = 0; i < times; i += 1) { + await Promise.resolve(); + } +} + +async function waitForMicrotaskCondition( + check: () => boolean, + message: string, + attempts = 100, +): Promise { + for (let i = 0; i < attempts; i += 1) { + if (check()) { + return; + } + await Promise.resolve(); + } + throw new Error(message); +} + function installTestRegistry( ...plugins: Array< ChannelPlugin | { plugin: ChannelPlugin; origin: string } @@ -618,6 +638,106 @@ describe("server-channels auto restart", () => { ); }); + it("limits whole-channel account startup fanout to four", async () => { + const accountIds = ["one", "two", "three", "four", "five", "six"]; + const releases: Array<() => void> = []; + let active = 0; + let maxActive = 0; + const isConfigured = vi.fn(async () => { + active += 1; + maxActive = Math.max(maxActive, active); + await new Promise((resolve) => { + releases.push(resolve); + }); + active -= 1; + return true; + }); + const startAccount = vi.fn( + async ({ abortSignal }: { abortSignal: AbortSignal }) => + await new Promise((resolve) => { + abortSignal.addEventListener("abort", () => resolve(), { once: true }); + }), + ); + installTestRegistry( + createTestPlugin({ + listAccountIds: () => accountIds, + isConfigured, + startAccount, + }), + ); + const manager = createManager(); + + const start = manager.startChannel("discord"); + await flushMicrotasks(); + + expect(isConfigured).toHaveBeenCalledTimes(4); + expect(maxActive).toBe(4); + expect(startAccount).not.toHaveBeenCalled(); + + releases.splice(0, 4).forEach((release) => release()); + await waitForMicrotaskCondition( + () => isConfigured.mock.calls.length === 6, + "expected second account startup wave", + ); + + expect(isConfigured).toHaveBeenCalledTimes(6); + expect(maxActive).toBe(4); + + releases.splice(0).forEach((release) => release()); + await start; + expect(startAccount).toHaveBeenCalledTimes(6); + + await manager.stopChannel("discord"); + }); + + it("limits channel plugin startup fanout to four", async () => { + const channelIds = Array.from({ length: 6 }, (_, index) => `test-${index}` as ChannelId); + const releases: Array<() => void> = []; + let active = 0; + let maxActive = 0; + const plugins = channelIds.map((id, index) => + createTestPlugin({ + id, + order: index, + isConfigured: async () => { + active += 1; + maxActive = Math.max(maxActive, active); + await new Promise((resolve) => { + releases.push(resolve); + }); + active -= 1; + return true; + }, + startAccount: async ({ abortSignal }) => + await new Promise((resolve) => { + abortSignal.addEventListener("abort", () => resolve(), { once: true }); + }), + }), + ); + installTestRegistry(...plugins); + const manager = createManager({ channelIds }); + + const start = manager.startChannels(); + await flushMicrotasks(); + + expect(releases).toHaveLength(4); + expect(maxActive).toBe(4); + + releases.splice(0, 4).forEach((release) => release()); + await waitForMicrotaskCondition( + () => releases.length === 2, + "expected second channel startup wave", + ); + + expect(releases).toHaveLength(2); + expect(maxActive).toBe(4); + + releases.splice(0).forEach((release) => release()); + await start; + + await Promise.all(channelIds.map((id) => manager.stopChannel(id))); + }); + it("evicts stale account lifecycle state during whole-channel reload", async () => { let accountIds = [DEFAULT_ACCOUNT_ID]; const startAccount = vi.fn( diff --git a/src/gateway/server-channels.ts b/src/gateway/server-channels.ts index fa7764b17e4..4c5d019e82e 100644 --- a/src/gateway/server-channels.ts +++ b/src/gateway/server-channels.ts @@ -25,6 +25,7 @@ import { normalizeOptionalAccountId, } from "../routing/session-key.js"; import type { RuntimeEnv } from "../runtime.js"; +import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js"; import type { ChannelRuntimeSnapshot } from "./server-channel-runtime.types.js"; export type { ChannelRuntimeSnapshot }; @@ -36,6 +37,7 @@ const CHANNEL_RESTART_POLICY: BackoffPolicy = { }; const MAX_RESTART_ATTEMPTS = 10; const CHANNEL_STOP_ABORT_TIMEOUT_MS = 5_000; +const CHANNEL_STARTUP_CONCURRENCY = 4; type ChannelRuntimeStore = { aborts: Map; @@ -376,8 +378,9 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage return; } - await Promise.all( - accountIds.map(async (id) => { + const startup = await runTasksWithConcurrency({ + limit: CHANNEL_STARTUP_CONCURRENCY, + tasks: accountIds.map((id) => async () => { if (store.tasks.has(id)) { return; } @@ -617,7 +620,10 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage } } }), - ); + }); + if (startup.hasError) { + throw startup.firstError; + } }; const startChannel = async (channelId: ChannelId, accountId?: string) => { @@ -696,25 +702,18 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage }; const startChannels = async () => { - const pending = [...listChannelPlugins()]; - const workerCount = Math.min(8, pending.length); - await Promise.all( - Array.from({ length: workerCount }, async () => { - for (;;) { - const plugin = pending.shift(); - if (!plugin) { - return; - } - try { - await measureStartup(`channels.${plugin.id}.start`, () => startChannel(plugin.id)); - } catch (err) { - ensureChannelLog(plugin.id).error?.( - `[${plugin.id}] channel startup failed: ${formatErrorMessage(err)}`, - ); - } + await runTasksWithConcurrency({ + limit: CHANNEL_STARTUP_CONCURRENCY, + tasks: [...listChannelPlugins()].map((plugin) => async () => { + try { + await measureStartup(`channels.${plugin.id}.start`, () => startChannel(plugin.id)); + } catch (err) { + ensureChannelLog(plugin.id).error?.( + `[${plugin.id}] channel startup failed: ${formatErrorMessage(err)}`, + ); } }), - ); + }); }; const markChannelLoggedOut = (channelId: ChannelId, cleared: boolean, accountId?: string) => {