From c191dc992884041e5559e0c235d2f335740ebf0f Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 29 Mar 2026 19:47:53 -0400 Subject: [PATCH] Control UI: preserve seq-gap reconnect state --- ui/src/ui/app-gateway.node.test.ts | 65 ++++++++++++++++++++++++++++++ ui/src/ui/app-gateway.ts | 33 +++++++++++++-- 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/ui/src/ui/app-gateway.node.test.ts b/ui/src/ui/app-gateway.node.test.ts index 2b908cc672d..862495a16f8 100644 --- a/ui/src/ui/app-gateway.node.test.ts +++ b/ui/src/ui/app-gateway.node.test.ts @@ -9,6 +9,7 @@ const loadChatHistoryMock = vi.hoisted(() => vi.fn(async () => undefined)); type GatewayClientMock = { start: ReturnType; stop: ReturnType; + request: ReturnType; options: { clientVersion?: string }; emitHello: (hello?: GatewayHelloOk) => void; emitClose: (info: { @@ -39,6 +40,7 @@ vi.mock("./gateway.ts", async (importOriginal) => { class GatewayBrowserClient { readonly start = vi.fn(); readonly stop = vi.fn(); + readonly request = vi.fn(async () => ({})); constructor( private opts: { @@ -56,6 +58,7 @@ vi.mock("./gateway.ts", async (importOriginal) => { gatewayClientInstances.push({ start: this.start, stop: this.stop, + request: this.request, options: { clientVersion: this.opts.clientVersion }, emitHello: (hello) => { this.opts.onHello?.( @@ -205,6 +208,68 @@ describe("connectGateway", () => { expect(host.lastError).toBeNull(); }); + it("preserves approval prompts, clears stale run indicators, and resumes queued work after seq-gap reconnect", () => { + const host = createHost(); + const chatHost = host as typeof host & { + chatRunId: string | null; + chatQueue: Array<{ + id: string; + text: string; + createdAt: number; + pendingRunId?: string; + }>; + }; + chatHost.chatRunId = "run-1"; + chatHost.chatQueue = [ + { + id: "pending", + text: "/steer tighten the plan", + createdAt: 1, + pendingRunId: "run-1", + }, + { + id: "queued", + text: "follow up", + createdAt: 2, + }, + ]; + host.execApprovalQueue = [ + { + id: "approval-1", + kind: "exec", + request: { command: "rm -rf /tmp/demo" }, + createdAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + }, + ]; + + connectGateway(host); + const client = gatewayClientInstances[0]; + expect(client).toBeDefined(); + + client.emitGap(20, 24); + + expect(gatewayClientInstances).toHaveLength(2); + expect(host.execApprovalQueue).toHaveLength(1); + expect(host.execApprovalQueue[0]?.id).toBe("approval-1"); + expect(chatHost.chatQueue).toHaveLength(1); + expect(chatHost.chatQueue[0]?.text).toBe("follow up"); + + const reconnectClient = gatewayClientInstances[1]; + expect(reconnectClient).toBeDefined(); + + reconnectClient.emitHello(); + + expect(reconnectClient.request).toHaveBeenCalledWith("chat.send", { + sessionKey: "main", + message: "follow up", + deliver: false, + idempotencyKey: expect.any(String), + attachments: undefined, + }); + expect(chatHost.chatQueue).toHaveLength(0); + }); + it("ignores stale client onEvent callbacks after reconnect", () => { const host = createHost(); diff --git a/ui/src/ui/app-gateway.ts b/ui/src/ui/app-gateway.ts index 30097da1c14..7368043144b 100644 --- a/ui/src/ui/app-gateway.ts +++ b/ui/src/ui/app-gateway.ts @@ -29,6 +29,7 @@ import { parseExecApprovalRequested, parseExecApprovalResolved, parsePluginApprovalRequested, + pruneExecApprovalQueue, removeExecApproval, } from "./controllers/exec-approval.ts"; import { loadHealthState } from "./controllers/health.ts"; @@ -98,6 +99,11 @@ type SessionDefaultsSnapshot = { type GatewayHostWithShutdownMessage = GatewayHost & { pendingShutdownMessage?: string | null; + resumeChatQueueAfterReconnect?: boolean; +}; + +type ConnectGatewayOptions = { + reason?: "initial" | "seq-gap"; }; export function resolveControlUiClientVersion(params: { @@ -179,14 +185,27 @@ function applySessionDefaults(host: GatewayHost, defaults?: SessionDefaultsSnaps } } -export function connectGateway(host: GatewayHost) { +export function connectGateway(host: GatewayHost, options?: ConnectGatewayOptions) { const shutdownHost = host as GatewayHostWithShutdownMessage; + const reconnectReason = options?.reason ?? "initial"; shutdownHost.pendingShutdownMessage = null; + shutdownHost.resumeChatQueueAfterReconnect = false; host.lastError = null; host.lastErrorCode = null; host.hello = null; host.connected = false; - host.execApprovalQueue = []; + if (reconnectReason === "seq-gap") { + // A seq gap means the socket stayed on the same gateway; preserve prompts + // that only arrived as ephemeral events and clear stale run-scoped indicators. + host.execApprovalQueue = pruneExecApprovalQueue(host.execApprovalQueue); + clearPendingQueueItemsForRun( + host as unknown as Parameters[0], + host.chatRunId ?? undefined, + ); + shutdownHost.resumeChatQueueAfterReconnect = true; + } else { + host.execApprovalQueue = []; + } host.execApprovalError = null; const previousClient = host.client; @@ -218,6 +237,14 @@ export function connectGateway(host: GatewayHost) { (host as unknown as { chatStream: string | null }).chatStream = null; (host as unknown as { chatStreamStartedAt: number | null }).chatStreamStartedAt = null; resetToolStream(host as unknown as Parameters[0]); + if (shutdownHost.resumeChatQueueAfterReconnect) { + // The interrupted run will never emit its terminal event now that the + // old client is gone, so resume any deferred commands after hello. + shutdownHost.resumeChatQueueAfterReconnect = false; + void flushChatQueueForEvent( + host as unknown as Parameters[0], + ); + } void subscribeSessions(host as unknown as OpenClawApp); void loadAssistantIdentity(host as unknown as OpenClawApp); void loadAgents(host as unknown as OpenClawApp); @@ -266,7 +293,7 @@ export function connectGateway(host: GatewayHost) { } host.lastError = `event gap detected (expected seq ${expected}, got ${received}); reconnecting`; host.lastErrorCode = null; - connectGateway(host); + connectGateway(host, { reason: "seq-gap" }); }, }); host.client = client;