diff --git a/CHANGELOG.md b/CHANGELOG.md index aab93b338d1..13845eb2413 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Plugins/doctor: repair missing configured provider and channel plugins from ClawHub before npm fallback, preserving ClawPack metadata in the install record. Thanks @vincentkoc. - Gateway/channels: cap startup fanout at four channel/account handoffs and recover from Bonjour ciao self-probe races, reducing Windows startup stalls with many Telegram accounts. Fixes #75687. - Gateway/sessions: keep `sessions.list` polling responsive on large session stores by reusing list-safe session cache/indexes and returning a lightweight compaction checkpoint preview instead of heavyweight summaries. Thanks @rolandrscheel. +- Control UI/Gateway: keep long-running dashboard WebSocket sessions alive with protocol pings and keep Stop available after reconnect or reload by recovering session-scoped active-run abort state. Fixes #70991. Thanks @alexandre-leng. - CLI/update: treat inherited Gateway service markers as origin hints and only block package replacement when the managed Gateway is still live, so self-updates can stop the service and continue safely. (#75729) Thanks @hxy91819. - Agents/failover: exempt run-level timeouts that fire during tool execution from model fallback, timeout-triggered compaction, and generic timeout payload synthesis. Long `process(poll)`, browser, or `exec` tool calls that exceed `agents.defaults.timeoutSeconds` previously rotated auth profiles, switched to a fallback model, and surfaced a misleading "LLM request timed out" error even though the primary model had already responded. Mirrors the existing `timedOutDuringCompaction` precedent (#46889). Fixes #52147. (#75873) Thanks @simonusa. - Docker: copy Bun 1.3.13 from a digest-pinned image and keep CI on the same version. Fixes #74356. Thanks @fede-kamel and @sallyom. diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 6945c6c91b6..8a1270de621 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -224,7 +224,10 @@ function shouldAttachPendingMessageSeq(params: { payload: unknown; cached?: bool } function emitSessionsChanged( - context: Pick, + context: Pick< + GatewayRequestContext, + "broadcastToConnIds" | "chatAbortControllers" | "getSessionEventSubscriberConnIds" + >, payload: { sessionKey?: string; reason: string; compacted?: boolean }, ) { const connIds = context.getSessionEventSubscriberConnIds(); @@ -282,6 +285,11 @@ function emitSessionsChanged( modelProvider: sessionRow.modelProvider, model: sessionRow.model, status: sessionRow.status, + hasActiveRun: hasTrackedActiveSessionRun({ + context, + requestedKey: payload.sessionKey ?? sessionRow.key, + canonicalKey: sessionRow.key, + }), startedAt: sessionRow.startedAt, endedAt: sessionRow.endedAt, runtimeMs: sessionRow.runtimeMs, @@ -427,10 +435,13 @@ function resolveAbortSessionKey(params: { } function hasTrackedActiveSessionRun(params: { - context: Pick; + context: Partial>; requestedKey: string; canonicalKey: string; }): boolean { + if (!(params.context.chatAbortControllers instanceof Map)) { + return false; + } for (const active of params.context.chatAbortControllers.values()) { if (active.sessionKey === params.canonicalKey || active.sessionKey === params.requestedKey) { return true; @@ -666,7 +677,22 @@ export const sessionsHandlers: GatewayRequestHandlers = { modelCatalog, opts: p, }); - respond(true, result, undefined); + respond( + true, + { + ...result, + sessions: result.sessions.map((session) => + Object.assign({}, session, { + hasActiveRun: hasTrackedActiveSessionRun({ + context, + requestedKey: session.key, + canonicalKey: session.key, + }), + }), + ), + }, + undefined, + ); }, "sessions.cleanup": async ({ params, respond, context }) => { if (!assertValidParams(params, validateSessionsCleanupParams, "sessions.cleanup", respond)) { diff --git a/src/gateway/server.sessions.list-changed.test.ts b/src/gateway/server.sessions.list-changed.test.ts index c8e02b16d6b..955a53550fe 100644 --- a/src/gateway/server.sessions.list-changed.test.ts +++ b/src/gateway/server.sessions.list-changed.test.ts @@ -157,6 +157,49 @@ test("sessions.list uses the gateway model catalog for effective thinking defaul ); }); +test("sessions.list marks sessions with active abortable runs", async () => { + await createSessionStoreDir(); + await writeSessionStore({ + entries: { + main: sessionStoreEntry("sess-main"), + }, + }); + + const respond = vi.fn(); + const sessionsHandlers = await getSessionsHandlers(); + const { getRuntimeConfig } = await getGatewayConfigModule(); + await sessionsHandlers["sessions.list"]({ + req: { + type: "req", + id: "req-sessions-list-active-run", + method: "sessions.list", + params: {}, + }, + params: {}, + respond, + client: null, + isWebchatConnect: () => false, + context: { + getRuntimeConfig, + loadGatewayModelCatalog: async () => [], + chatAbortControllers: new Map([["run-1", { sessionKey: "agent:main:main" }]]), + } as never, + }); + + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + sessions: expect.arrayContaining([ + expect.objectContaining({ + key: "agent:main:main", + hasActiveRun: true, + }), + ]), + }), + undefined, + ); +}); + test("sessions.list yields before responding during bulk transcript hydration", async () => { const { dir } = await createSessionStoreDir(); const entries: Record> = {}; diff --git a/src/gateway/server/ws-connection.test.ts b/src/gateway/server/ws-connection.test.ts index 941f0302e36..c21593cd457 100644 --- a/src/gateway/server/ws-connection.test.ts +++ b/src/gateway/server/ws-connection.test.ts @@ -1,5 +1,5 @@ import { EventEmitter } from "node:events"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { WebSocketServer } from "ws"; import type { ResolvedGatewayAuth } from "../auth.js"; @@ -36,6 +36,10 @@ describe("attachGatewayWsConnectionHandler", () => { attachGatewayWsMessageHandlerMock.mockReset(); }); + afterEach(() => { + vi.useRealTimers(); + }); + it("threads current auth getters into the handshake handler instead of a stale snapshot", () => { const listeners = new Map void>(); const wss = { @@ -132,6 +136,7 @@ describe("attachGatewayWsConnectionHandler", () => { port: 19001, canvasHostEnabled: false, resolvedAuth: createResolvedAuth("token"), + preauthHandshakeTimeoutMs: 60_000, gatewayMethods: [], events: [], refreshHealthSnapshot: vi.fn(), @@ -167,4 +172,76 @@ describe("attachGatewayWsConnectionHandler", () => { expect(registered).toBe(false); expect(clients.size).toBe(0); }); + + it("sends protocol pings until the connection closes", () => { + vi.useFakeTimers(); + const listeners = new Map void>(); + const wss = { + on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + listeners.set(event, handler); + }), + } as unknown as WebSocketServer; + const socket = Object.assign(new EventEmitter(), { + _socket: { + remoteAddress: "127.0.0.1", + remotePort: 1234, + localAddress: "127.0.0.1", + localPort: 5678, + }, + send: vi.fn(), + ping: vi.fn(), + close: vi.fn(), + }); + const upgradeReq = { + headers: { host: "127.0.0.1:19001" }, + socket: { localAddress: "127.0.0.1" }, + }; + + attachGatewayWsConnectionHandler({ + wss, + clients: new Set(), + preauthConnectionBudget: { release: vi.fn() } as never, + port: 19001, + canvasHostEnabled: false, + resolvedAuth: createResolvedAuth("token"), + preauthHandshakeTimeoutMs: 60_000, + gatewayMethods: [], + events: [], + refreshHealthSnapshot: vi.fn(), + logGateway: createLogger() as never, + logHealth: createLogger() as never, + logWsControl: createLogger() as never, + extraHandlers: {}, + broadcast: vi.fn(), + buildRequestContext: () => + ({ + unsubscribeAllSessionEvents: vi.fn(), + nodeRegistry: { unregister: vi.fn() }, + nodeUnsubscribeAll: vi.fn(), + }) as never, + }); + + const onConnection = listeners.get("connection"); + expect(onConnection).toBeTypeOf("function"); + onConnection?.(socket, upgradeReq); + + const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as { + setClient: (client: unknown) => boolean; + }; + expect( + passed.setClient({ + socket, + connect: { client: { id: "openclaw-control-ui", mode: "webchat" } }, + connId: "ping-client", + usesSharedGatewayAuth: false, + }), + ).toBe(true); + + vi.advanceTimersByTime(25_000); + expect(socket.ping).toHaveBeenCalledTimes(1); + + socket.emit("close", 1000, Buffer.from("done")); + vi.advanceTimersByTime(25_000); + expect(socket.ping).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index 4bddf62bbac..95a0bf9ffa3 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -267,12 +267,17 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti payload: { nonce: connectNonce, ts: Date.now() }, }); + let pingTimer: ReturnType | undefined; + const close = (code = 1000, reason?: string) => { if (closed) { return; } closed = true; clearTimeout(handshakeTimer); + if (pingTimer !== undefined) { + clearInterval(pingTimer); + } releasePreauthBudget(); if (client) { clients.delete(client); @@ -423,6 +428,13 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti releasePreauthBudget(); client = next; clients.add(next); + pingTimer = setInterval(() => { + try { + socket.ping(); + } catch { + // close() clears the timer; ping can race with a socket already entering CLOSING. + } + }, 25_000); return true; }, setHandshakeState: (next) => { diff --git a/src/gateway/session-utils.types.ts b/src/gateway/session-utils.types.ts index a36722af81d..91f22b69f5c 100644 --- a/src/gateway/session-utils.types.ts +++ b/src/gateway/session-utils.types.ts @@ -71,6 +71,7 @@ export type GatewaySessionRow = { totalTokensFresh?: boolean; estimatedCostUsd?: number; status?: SessionRunStatus; + hasActiveRun?: boolean; subagentRunState?: SubagentRunState; hasActiveSubagentRun?: boolean; startedAt?: number; diff --git a/ui/src/ui/app-chat.test.ts b/ui/src/ui/app-chat.test.ts index 9ba7855c1dc..12893fbd365 100644 --- a/ui/src/ui/app-chat.test.ts +++ b/ui/src/ui/app-chat.test.ts @@ -1024,6 +1024,24 @@ describe("handleAbortChat", () => { expect(host.chatRunId).toBe("run-main"); }); + it("queues a session-scoped abort while disconnected after active run state is recovered", async () => { + const host = makeHost({ + connected: false, + chatRunId: null, + chatMessage: "draft", + sessionKey: "agent:main", + sessionsResult: createSessionsResult([ + row("agent:main", { hasActiveRun: true }), + row("agent:other", { hasActiveRun: true }), + ]), + }); + + await handleAbortChat(host); + + expect(host.pendingAbort).toEqual({ runId: null, sessionKey: "agent:main" }); + expect(host.chatMessage).toBe(""); + }); + it("keeps the draft when disconnected without an active run", async () => { const host = makeHost({ connected: false, diff --git a/ui/src/ui/app-chat.ts b/ui/src/ui/app-chat.ts index c376bdaf4aa..1c458005687 100644 --- a/ui/src/ui/app-chat.ts +++ b/ui/src/ui/app-chat.ts @@ -66,7 +66,7 @@ export type ChatHost = ChatInputHistoryState & { sessionsResult?: SessionsListResult | null; updateComplete?: Promise; refreshSessionsAfterChat: Set; - pendingAbort?: { runId: string; sessionKey: string } | null; + pendingAbort?: { runId?: string | null; sessionKey: string } | null; chatSubmitGuards?: Map>; /** Callback for slash-command side effects that need app-level access. */ onSlashAction?: (action: string) => void | Promise; @@ -90,6 +90,21 @@ export function isChatBusy(host: ChatHost) { return host.chatSending || Boolean(host.chatRunId); } +export function hasAbortableSessionRun(host: { + chatRunId?: string | null; + sessionKey: string; + sessionsResult?: SessionsListResult | null; +}): boolean { + if (host.chatRunId) { + return true; + } + return Boolean( + host.sessionsResult?.sessions.some( + (session) => session.key === host.sessionKey && session.hasActiveRun === true, + ), + ); +} + export function isChatStopCommand(text: string) { const trimmed = text.trim(); if (!trimmed) { @@ -135,11 +150,12 @@ function isBtwCommand(text: string) { } export async function handleAbortChat(host: ChatHost) { - // If disconnected but we have an active runId, queue the abort for when we reconnect - if (!host.connected && host.chatRunId) { + const activeRunId = host.chatRunId; + // If disconnected but this session is abortable, queue the abort for when we reconnect. + if (!host.connected && hasAbortableSessionRun(host)) { host.chatMessage = ""; resetChatInputHistoryNavigation(host); - host.pendingAbort = { runId: host.chatRunId, sessionKey: host.sessionKey }; + host.pendingAbort = { runId: activeRunId, sessionKey: host.sessionKey }; return; } if (!host.connected) { diff --git a/ui/src/ui/app-gateway.node.test.ts b/ui/src/ui/app-gateway.node.test.ts index c44b4ef0f90..21b43965fa2 100644 --- a/ui/src/ui/app-gateway.node.test.ts +++ b/ui/src/ui/app-gateway.node.test.ts @@ -679,6 +679,23 @@ describe("connectGateway", () => { expect(host.chatStream).toBeNull(); }); + it("sends queued session-scoped chat aborts after reconnect", async () => { + const host = createHost(); + host.pendingAbort = { sessionKey: "main" }; + + connectGateway(host); + const client = gatewayClientInstances[0]; + expect(client).toBeDefined(); + + client.emitHello(); + await Promise.resolve(); + + expect(client.request).toHaveBeenCalledWith("chat.abort", { + sessionKey: "main", + }); + expect(host.pendingAbort).toBeNull(); + }); + it("logs and drops stale queued chat abort failures after reconnect", async () => { const host = createHost(); host.pendingAbort = { runId: "run-stale", sessionKey: "main" }; diff --git a/ui/src/ui/app-gateway.ts b/ui/src/ui/app-gateway.ts index fafe0d3367d..ce19800fee7 100644 --- a/ui/src/ui/app-gateway.ts +++ b/ui/src/ui/app-gateway.ts @@ -102,7 +102,7 @@ type GatewayHost = { updateStatusBanner: { tone: "danger" | "warn" | "info"; text: string } | null; sessionKey: string; chatRunId: string | null; - pendingAbort?: { runId: string; sessionKey: string } | null; + pendingAbort?: { runId?: string | null; sessionKey: string } | null; refreshSessionsAfterChat: Set; execApprovalQueue: ExecApprovalRequest[]; execApprovalError: string | null; @@ -439,10 +439,12 @@ export function connectGateway(host: GatewayHost, options?: ConnectGatewayOption const abort = host.pendingAbort; host.pendingAbort = null; void host.client - .request("chat.abort", { - sessionKey: abort.sessionKey, - runId: abort.runId, - }) + .request( + "chat.abort", + abort.runId + ? { sessionKey: abort.sessionKey, runId: abort.runId } + : { sessionKey: abort.sessionKey }, + ) .catch((err) => { // Log to console for diagnostics; user sees no feedback for a stale abort // since the run likely completed during the disconnect window anyway. diff --git a/ui/src/ui/app-render.ts b/ui/src/ui/app-render.ts index 720f881cc94..c73772b00e8 100644 --- a/ui/src/ui/app-render.ts +++ b/ui/src/ui/app-render.ts @@ -2,7 +2,7 @@ import { html, nothing } from "lit"; import { styleMap } from "lit/directives/style-map.js"; import { t } from "../i18n/index.ts"; import { getSafeLocalStorage } from "../local-storage.ts"; -import { refreshChat } from "./app-chat.ts"; +import { hasAbortableSessionRun, refreshChat } from "./app-chat.ts"; import { DEFAULT_CRON_FORM } from "./app-defaults.ts"; import { renderUsageTab } from "./app-render-usage-tab.ts"; import { @@ -2373,7 +2373,7 @@ export function renderApp(state: AppViewState) { onSend: () => state.handleSendChat(), onCompact: () => state.handleSendChat("/compact", { restoreDraft: true }), onToggleRealtimeTalk: () => state.toggleRealtimeTalk(), - canAbort: Boolean(state.chatRunId), + canAbort: hasAbortableSessionRun(state), onAbort: () => void state.handleAbortChat(), onQueueRemove: (id) => state.removeQueuedMessage(id), onQueueSteer: (id) => void state.steerQueuedChatMessage(id), diff --git a/ui/src/ui/controllers/sessions.ts b/ui/src/ui/controllers/sessions.ts index 4b46b8af559..fb97c60ef97 100644 --- a/ui/src/ui/controllers/sessions.ts +++ b/ui/src/ui/controllers/sessions.ts @@ -65,6 +65,7 @@ const SESSION_EVENT_ROW_FIELDS = [ "endedAt", "elevatedLevel", "fastMode", + "hasActiveRun", "inputTokens", "kind", "label", diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index 63f9fa5702a..dd892c0e4a3 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -441,6 +441,7 @@ export type GatewaySessionRow = { totalTokens?: number; totalTokensFresh?: boolean; status?: SessionRunStatus; + hasActiveRun?: boolean; subagentRunState?: SubagentRunState; hasActiveSubagentRun?: boolean; startedAt?: number;