diff --git a/CHANGELOG.md b/CHANGELOG.md index 01c3cfebc66..72153a51f99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ Docs: https://docs.openclaw.ai - Webhooks/Gmail/Windows: resolve `gcloud`, `gog`, and `tailscale` PATH/PATHEXT shims before setup and watcher spawns, using the Windows-safe `.cmd` wrapper for long-lived `gog serve` processes. (#74881, fixes #54470) Thanks @Angfr95. - Video generation: accept provider-specific aspect-ratio and resolution hints at the tool boundary, normalize `720P` to MiniMax's supported `768P`, and stop sending Google `generateAudio` on Gemini video requests so provider fallback can recover from model-specific parameter differences. Thanks @vincentkoc. - Plugins/install: honor the beta update channel for onboarding and doctor-managed plugin installs by requesting floating npm and ClawHub specs with `@beta` while keeping persistent install records on the catalog default. Thanks @vincentkoc. +- Slack: keep health-monitor recovery stops from poisoning manual-stop state after channel stop timeouts, allowing Socket Mode accounts to reconnect after event-loop stalls instead of staying dead until Gateway restart. Fixes #77651. Thanks @Gusty3055. - WhatsApp/onboarding: canonicalize setup and pairing allowlist entries to WhatsApp's digit-only phone ids while still accepting E.164, JID, and `whatsapp:` inputs, so personal-phone allowlists match WhatsApp Web sender ids after setup. Thanks @vincentkoc. - Gateway/startup: load provider plugins that own explicitly configured image, video, or music generation defaults so generation tools become live after gateway restart instead of remaining catalog-only. Fixes #77244. Thanks @buyuangtampan, @Nikoxx99, and @vincentkoc. - Control UI/chat: suppress `HEARTBEAT_OK` acknowledgement history, streams, deltas, and final events before they enter the transcript view, so repeated heartbeat no-op turns do not stack noisy bubbles. Thanks @BunsDev. diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index acc32cee6f1..9599f4bee93 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -131,7 +131,7 @@ async function expectRestartedChannel( accountId = "default", ) { const monitor = await startAndRunCheck(manager); - expect(manager.stopChannel).toHaveBeenCalledWith(channel, accountId); + expect(manager.stopChannel).toHaveBeenCalledWith(channel, accountId, { manual: false }); expect(manager.startChannel).toHaveBeenCalledWith(channel, accountId); monitor.stop(); } @@ -286,9 +286,9 @@ describe("channel-health-monitor", () => { }, ); const monitor = await startAndRunCheck(manager); - expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default"); + expect(manager.stopChannel).toHaveBeenCalledWith("discord", "default", { manual: false }); expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); - expect(manager.stopChannel).not.toHaveBeenCalledWith("discord", "quiet"); + expect(manager.stopChannel).not.toHaveBeenCalledWith("discord", "quiet", { manual: false }); expect(manager.startChannel).not.toHaveBeenCalledWith("discord", "quiet"); monitor.stop(); }); @@ -308,7 +308,7 @@ describe("channel-health-monitor", () => { }, }); const monitor = await startAndRunCheck(manager); - expect(manager.stopChannel).toHaveBeenCalledWith("whatsapp", "default"); + expect(manager.stopChannel).toHaveBeenCalledWith("whatsapp", "default", { manual: false }); expect(manager.resetRestartAttempts).toHaveBeenCalledWith("whatsapp", "default"); expect(manager.startChannel).toHaveBeenCalledWith("whatsapp", "default"); monitor.stop(); @@ -613,7 +613,7 @@ describe("channel-health-monitor", () => { const monitor = await startAndRunCheck(manager, { staleEventThresholdMs: customThreshold, }); - expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default", { manual: false }); expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); monitor.stop(); }); diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 20a3f7728c6..2df76406928 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -163,7 +163,9 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann try { if (status.running) { - await channelManager.stopChannel(channelId as ChannelId, accountId); + await channelManager.stopChannel(channelId as ChannelId, accountId, { + manual: false, + }); } channelManager.resetRestartAttempts(channelId as ChannelId, accountId); await channelManager.startChannel(channelId as ChannelId, accountId); diff --git a/src/gateway/server-channels.test.ts b/src/gateway/server-channels.test.ts index 9a6f0e34fc3..9dfccc6e857 100644 --- a/src/gateway/server-channels.test.ts +++ b/src/gateway/server-channels.test.ts @@ -329,6 +329,85 @@ describe("server-channels auto restart", () => { expect(account?.lastError).toContain("channel stop timed out"); }); + it("does not poison auto-restart state when recovery stop times out", async () => { + const releaseFirstTask = createDeferred(); + const startAccount = vi.fn( + async ({ abortSignal }: { abortSignal: AbortSignal }) => + await new Promise((resolve) => { + abortSignal.addEventListener("abort", () => {}, { once: true }); + void releaseFirstTask.promise.then(resolve); + }), + ); + installTestRegistry( + createTestPlugin({ + startAccount, + }), + ); + const manager = createManager(); + + await manager.startChannels(); + const stopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID, { manual: false }); + await vi.advanceTimersByTimeAsync(5_000); + await stopTask; + await manager.startChannel("discord", DEFAULT_ACCOUNT_ID); + + const snapshot = manager.getRuntimeSnapshot(); + const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID]; + expect(startAccount).toHaveBeenCalledTimes(1); + expect(account?.running).toBe(false); + expect(account?.restartPending).toBe(true); + expect(account?.lastError).toContain("channel stop timed out"); + expect(manager.isManuallyStopped("discord", DEFAULT_ACCOUNT_ID)).toBe(false); + + releaseFirstTask.resolve(); + await flushMicrotasks(); + await vi.advanceTimersByTimeAsync(10); + await flushMicrotasks(); + + expect(startAccount).toHaveBeenCalledTimes(2); + }); + + it("lets manual stops cancel recovery backoff after recovery stop times out", async () => { + const releaseFirstTask = createDeferred(); + const startAccount = vi.fn( + async ({ abortSignal }: { abortSignal: AbortSignal }) => + await new Promise((resolve) => { + abortSignal.addEventListener("abort", () => {}, { once: true }); + void releaseFirstTask.promise.then(resolve); + }), + ); + installTestRegistry( + createTestPlugin({ + startAccount, + }), + ); + const manager = createManager(); + + await manager.startChannels(); + const recoveryStopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID, { + manual: false, + }); + await vi.advanceTimersByTimeAsync(5_000); + await recoveryStopTask; + + releaseFirstTask.resolve(); + await waitForMicrotaskCondition( + () => hoisted.sleepWithAbort.mock.calls.length > 0, + "expected recovery restart backoff to be scheduled", + ); + expect(hoisted.sleepWithAbort).toHaveBeenCalledWith(10, expect.any(AbortSignal)); + + await manager.stopChannel("discord", DEFAULT_ACCOUNT_ID); + await vi.advanceTimersByTimeAsync(10); + await flushMicrotasks(); + + const account = manager.getRuntimeSnapshot().channelAccounts.discord?.[DEFAULT_ACCOUNT_ID]; + expect(startAccount).toHaveBeenCalledTimes(1); + expect(account?.running).toBe(false); + expect(account?.restartPending).toBe(false); + expect(manager.isManuallyStopped("discord", DEFAULT_ACCOUNT_ID)).toBe(true); + }); + it("marks enabled/configured when account descriptors omit them", () => { installTestRegistry( createTestPlugin({ diff --git a/src/gateway/server-channels.ts b/src/gateway/server-channels.ts index 4c5d019e82e..2d858cc619b 100644 --- a/src/gateway/server-channels.ts +++ b/src/gateway/server-channels.ts @@ -188,11 +188,15 @@ type StartChannelOptions = { preserveManualStop?: boolean; }; +type StopChannelOptions = { + manual?: boolean; +}; + export type ChannelManager = { getRuntimeSnapshot: () => ChannelRuntimeSnapshot; startChannels: () => Promise; startChannel: (channel: ChannelId, accountId?: string) => Promise; - stopChannel: (channel: ChannelId, accountId?: string) => Promise; + stopChannel: (channel: ChannelId, accountId?: string, opts?: StopChannelOptions) => Promise; markChannelLoggedOut: (channelId: ChannelId, cleared: boolean, accountId?: string) => void; isManuallyStopped: (channelId: ChannelId, accountId: string) => boolean; resetRestartAttempts: (channelId: ChannelId, accountId: string) => void; @@ -216,6 +220,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage const restartAttempts = new Map(); // Tracks accounts that were manually stopped so we don't auto-restart them. const manuallyStopped = new Set(); + const recoveryStopTimedOut = new Set(); const restartKey = (channelId: ChannelId, accountId: string) => `${channelId}:${accountId}`; const ensureChannelLog = (channelId: ChannelId): SubsystemLogger => { @@ -568,15 +573,24 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage restartPending: true, reconnectAttempts: attempt, }); + const recoveryRestartSleepAbort = recoveryStopTimedOut.has(rKey) + ? new AbortController() + : undefined; + if (recoveryRestartSleepAbort) { + store.aborts.set(id, recoveryRestartSleepAbort); + } try { - await sleepWithAbort(delayMs, abort.signal); + const restartSleepAbort = recoveryRestartSleepAbort?.signal ?? abort.signal; + await sleepWithAbort(delayMs, restartSleepAbort); if (manuallyStopped.has(rKey)) { + recoveryStopTimedOut.delete(rKey); return; } + recoveryStopTimedOut.delete(rKey); if (store.tasks.get(id) === trackedPromise) { store.tasks.delete(id); } - if (store.aborts.get(id) === abort) { + if (store.aborts.get(id) === (recoveryRestartSleepAbort ?? abort)) { store.aborts.delete(id); } await startChannelInternal(channelId, id, { @@ -585,6 +599,13 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage }); } catch { // abort or startup failure — next crash will retry + } finally { + if (recoveryRestartSleepAbort) { + recoveryStopTimedOut.delete(rKey); + if (store.aborts.get(id) === recoveryRestartSleepAbort) { + store.aborts.delete(id); + } + } } }) .finally(() => { @@ -630,7 +651,12 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage await startChannelInternal(channelId, accountId); }; - const stopChannel = async (channelId: ChannelId, accountId?: string) => { + const stopChannel = async ( + channelId: ChannelId, + accountId?: string, + opts: StopChannelOptions = {}, + ) => { + const manual = opts.manual ?? true; const plugin = getChannelPlugin(channelId); const store = getStore(channelId); // Fast path: nothing running and no explicit plugin shutdown hook to run. @@ -656,7 +682,10 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage if (!abort && !task && !plugin?.gateway?.stopAccount) { return; } - manuallyStopped.add(restartKey(channelId, id)); + const rKey = restartKey(channelId, id); + if (manual) { + manuallyStopped.add(rKey); + } abort?.abort(); const log = ensureChannelLog(channelId); const runtime = ensureChannelRuntime(channelId); @@ -683,12 +712,16 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage ); setRuntime(channelId, id, { accountId: id, - running: true, - restartPending: false, + running: manual, + restartPending: !manual, lastError: `channel stop timed out after ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms`, }); + if (!manual) { + recoveryStopTimedOut.add(rKey); + } return; } + recoveryStopTimedOut.delete(rKey); store.aborts.delete(id); store.tasks.delete(id); setRuntime(channelId, id, {