Control UI: preserve seq-gap reconnect state

This commit is contained in:
Gustavo Madeira Santana
2026-03-29 19:47:53 -04:00
parent cf84a03ecf
commit c191dc9928
2 changed files with 95 additions and 3 deletions

View File

@@ -9,6 +9,7 @@ const loadChatHistoryMock = vi.hoisted(() => vi.fn(async () => undefined));
type GatewayClientMock = {
start: ReturnType<typeof vi.fn>;
stop: ReturnType<typeof vi.fn>;
request: ReturnType<typeof vi.fn>;
options: { clientVersion?: string };
emitHello: (hello?: GatewayHelloOk) => void;
emitClose: (info: {
@@ -39,6 +40,7 @@ vi.mock("./gateway.ts", async (importOriginal) => {
class GatewayBrowserClient {
readonly start = vi.fn();
readonly stop = vi.fn();
readonly request = vi.fn(async () => ({}));
constructor(
private opts: {
@@ -56,6 +58,7 @@ vi.mock("./gateway.ts", async (importOriginal) => {
gatewayClientInstances.push({
start: this.start,
stop: this.stop,
request: this.request,
options: { clientVersion: this.opts.clientVersion },
emitHello: (hello) => {
this.opts.onHello?.(
@@ -205,6 +208,68 @@ describe("connectGateway", () => {
expect(host.lastError).toBeNull();
});
it("preserves approval prompts, clears stale run indicators, and resumes queued work after seq-gap reconnect", () => {
const host = createHost();
const chatHost = host as typeof host & {
chatRunId: string | null;
chatQueue: Array<{
id: string;
text: string;
createdAt: number;
pendingRunId?: string;
}>;
};
chatHost.chatRunId = "run-1";
chatHost.chatQueue = [
{
id: "pending",
text: "/steer tighten the plan",
createdAt: 1,
pendingRunId: "run-1",
},
{
id: "queued",
text: "follow up",
createdAt: 2,
},
];
host.execApprovalQueue = [
{
id: "approval-1",
kind: "exec",
request: { command: "rm -rf /tmp/demo" },
createdAtMs: Date.now(),
expiresAtMs: Date.now() + 60_000,
},
];
connectGateway(host);
const client = gatewayClientInstances[0];
expect(client).toBeDefined();
client.emitGap(20, 24);
expect(gatewayClientInstances).toHaveLength(2);
expect(host.execApprovalQueue).toHaveLength(1);
expect(host.execApprovalQueue[0]?.id).toBe("approval-1");
expect(chatHost.chatQueue).toHaveLength(1);
expect(chatHost.chatQueue[0]?.text).toBe("follow up");
const reconnectClient = gatewayClientInstances[1];
expect(reconnectClient).toBeDefined();
reconnectClient.emitHello();
expect(reconnectClient.request).toHaveBeenCalledWith("chat.send", {
sessionKey: "main",
message: "follow up",
deliver: false,
idempotencyKey: expect.any(String),
attachments: undefined,
});
expect(chatHost.chatQueue).toHaveLength(0);
});
it("ignores stale client onEvent callbacks after reconnect", () => {
const host = createHost();

View File

@@ -29,6 +29,7 @@ import {
parseExecApprovalRequested,
parseExecApprovalResolved,
parsePluginApprovalRequested,
pruneExecApprovalQueue,
removeExecApproval,
} from "./controllers/exec-approval.ts";
import { loadHealthState } from "./controllers/health.ts";
@@ -98,6 +99,11 @@ type SessionDefaultsSnapshot = {
type GatewayHostWithShutdownMessage = GatewayHost & {
pendingShutdownMessage?: string | null;
resumeChatQueueAfterReconnect?: boolean;
};
type ConnectGatewayOptions = {
reason?: "initial" | "seq-gap";
};
export function resolveControlUiClientVersion(params: {
@@ -179,14 +185,27 @@ function applySessionDefaults(host: GatewayHost, defaults?: SessionDefaultsSnaps
}
}
export function connectGateway(host: GatewayHost) {
export function connectGateway(host: GatewayHost, options?: ConnectGatewayOptions) {
const shutdownHost = host as GatewayHostWithShutdownMessage;
const reconnectReason = options?.reason ?? "initial";
shutdownHost.pendingShutdownMessage = null;
shutdownHost.resumeChatQueueAfterReconnect = false;
host.lastError = null;
host.lastErrorCode = null;
host.hello = null;
host.connected = false;
host.execApprovalQueue = [];
if (reconnectReason === "seq-gap") {
// A seq gap means the socket stayed on the same gateway; preserve prompts
// that only arrived as ephemeral events and clear stale run-scoped indicators.
host.execApprovalQueue = pruneExecApprovalQueue(host.execApprovalQueue);
clearPendingQueueItemsForRun(
host as unknown as Parameters<typeof clearPendingQueueItemsForRun>[0],
host.chatRunId ?? undefined,
);
shutdownHost.resumeChatQueueAfterReconnect = true;
} else {
host.execApprovalQueue = [];
}
host.execApprovalError = null;
const previousClient = host.client;
@@ -218,6 +237,14 @@ export function connectGateway(host: GatewayHost) {
(host as unknown as { chatStream: string | null }).chatStream = null;
(host as unknown as { chatStreamStartedAt: number | null }).chatStreamStartedAt = null;
resetToolStream(host as unknown as Parameters<typeof resetToolStream>[0]);
if (shutdownHost.resumeChatQueueAfterReconnect) {
// The interrupted run will never emit its terminal event now that the
// old client is gone, so resume any deferred commands after hello.
shutdownHost.resumeChatQueueAfterReconnect = false;
void flushChatQueueForEvent(
host as unknown as Parameters<typeof flushChatQueueForEvent>[0],
);
}
void subscribeSessions(host as unknown as OpenClawApp);
void loadAssistantIdentity(host as unknown as OpenClawApp);
void loadAgents(host as unknown as OpenClawApp);
@@ -266,7 +293,7 @@ export function connectGateway(host: GatewayHost) {
}
host.lastError = `event gap detected (expected seq ${expected}, got ${received}); reconnecting`;
host.lastErrorCode = null;
connectGateway(host);
connectGateway(host, { reason: "seq-gap" });
},
});
host.client = client;