From bd2f8560fee61788c76d9bb5eb99912b7932b010 Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Sun, 3 May 2026 16:30:17 -0500 Subject: [PATCH] fix(gateway): dedupe active WebChat sends Collapse duplicate in-flight internal WebChat text sends onto the active Gateway run so rapid repeat submits do not start fresh `agent:main:main` dispatches. - Add active-run scoped internal text-send dedupe in `chat.send`. - Exclude slash commands, attachments, explicit delivery routes, non-internal origins, and completed runs. - Cover the behavior with a Gateway chat regression test. - Credit both the reporter and BunsDev in the Unreleased changelog entry. Validation: - `pnpm docs:list` - `git diff --check` - `pnpm check:changelog-attributions` - `pnpm exec oxfmt --check --threads=1 src/gateway/server-methods/chat.ts src/gateway/server.chat.gateway-server-chat-b.test.ts` - `pnpm test src/gateway/server.chat.gateway-server-chat-b.test.ts -t "duplicate WebChat" -- --reporter=dot` - Blacksmith Testbox `OPENCLAW_TESTBOX=1 pnpm check:changed` - GitHub PR security/stability checks for head `6884240414997228a136f0fbb85b73a8db4b7fae` Fixes #75737. --- CHANGELOG.md | 1 + src/gateway/server-methods/chat.ts | 82 +++++++++++-- .../server.chat.gateway-server-chat-b.test.ts | 108 ++++++++++++++++++ 3 files changed, 181 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44041c62b13..5341435eb6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Control UI/WebChat: collapse duplicate in-flight internal text sends onto the active Gateway run so rapid repeat submits do not start fresh `agent:main:main` dispatches. Fixes #75737. Thanks @dsdsddd1 and @BunsDev. - Channels/streaming: expose `streaming.progress.label`, `labels`, `maxLines`, and `toolProgress` in bundled channel config metadata so progress draft settings appear in config, docs, and control surfaces. Thanks @vincentkoc. - Channels/streaming: normalize whitespace and case for `streaming.progress.label: "auto"` so progress draft labels keep using the built-in label pool instead of rendering a literal `auto` title. Thanks @vincentkoc. - Gateway/install: prefer supported system Node over nvm/fnm/volta/asdf/mise when regenerating managed gateway services, so `gateway install --force` no longer recreates service definitions that doctor immediately flags as version-manager-backed. Fixes #76339. Thanks @brokemac79. diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 7ce6daeb332..e5e6cc94794 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import fs from "node:fs"; import path from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; @@ -235,6 +236,40 @@ type ChatSendOriginatingRoute = { explicitDeliverRoute: boolean; }; +const ACTIVE_CHAT_SEND_DEDUPE_PREFIX = "chat:active-send"; + +function resolveActiveChatSendRunId(value: unknown): string | null { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + const runId = (value as { runId?: unknown }).runId; + return typeof runId === "string" && runId.trim() ? runId : null; +} + +function buildActiveChatSendDedupeKey(params: { + attachmentCount: number; + explicitDeliverRoute: boolean; + message: string; + originatingChannel: string; + sessionKey: string; +}): string | null { + const message = params.message.trim(); + if ( + !message || + message.startsWith("/") || + params.attachmentCount > 0 || + params.explicitDeliverRoute || + normalizeMessageChannel(params.originatingChannel) !== INTERNAL_MESSAGE_CHANNEL + ) { + return null; + } + const digest = createHash("sha256") + .update(JSON.stringify([params.sessionKey, message])) + .digest("hex") + .slice(0, 32); + return `${ACTIVE_CHAT_SEND_DEDUPE_PREFIX}:${digest}`; +} + type ChatSendExplicitOrigin = { originatingChannel?: string; originatingTo?: string; @@ -2015,6 +2050,35 @@ export const chatHandlers: GatewayRequestHandlers = { }); return; } + const clientInfo = client?.connect?.client; + const originatingRoute = resolveChatSendOriginatingRoute({ + client: clientInfo, + deliver: p.deliver, + entry, + explicitOrigin: explicitOriginResult.value, + hasConnectedClient: client?.connect !== undefined, + mainKey: cfg.session?.mainKey, + sessionKey, + }); + const activeChatSendDedupeKey = buildActiveChatSendDedupeKey({ + attachmentCount: normalizedAttachments.length, + explicitDeliverRoute: originatingRoute.explicitDeliverRoute, + message: rawMessage, + originatingChannel: originatingRoute.originatingChannel, + sessionKey, + }); + if (activeChatSendDedupeKey) { + const activeRunId = resolveActiveChatSendRunId( + context.dedupe.get(activeChatSendDedupeKey)?.payload, + ); + if (activeRunId && context.chatAbortControllers.has(activeRunId)) { + respond(true, { runId: activeRunId, status: "in_flight" as const }, undefined, { + cached: true, + runId: activeRunId, + }); + return; + } + } const explicitOriginTargetsPlugin = explicitOriginTargetsPluginBinding( explicitOriginResult.value, ); @@ -2097,6 +2161,13 @@ export const chatHandlers: GatewayRequestHandlers = { }); return; } + if (activeChatSendDedupeKey) { + context.dedupe.set(activeChatSendDedupeKey, { + ts: now, + ok: true, + payload: { runId: clientRunId }, + }); + } context.addChatRun(clientRunId, { sessionKey, clientRunId, @@ -2126,22 +2197,13 @@ export const chatHandlers: GatewayRequestHandlers = { const messageForAgent = systemProvenanceReceipt ? [systemProvenanceReceipt, parsedMessage].filter(Boolean).join("\n\n") : parsedMessage; - const clientInfo = client?.connect?.client; const { originatingChannel, originatingTo, accountId, messageThreadId, explicitDeliverRoute, - } = resolveChatSendOriginatingRoute({ - client: clientInfo, - deliver: p.deliver, - entry, - explicitOrigin: explicitOriginResult.value, - hasConnectedClient: client?.connect !== undefined, - mainKey: cfg.session?.mainKey, - sessionKey, - }); + } = originatingRoute; // Inject timestamp so agents know the current date/time. // Only BodyForAgent gets the timestamp — Body stays raw for UI display. // See: https://github.com/moltbot/moltbot/issues/3658 diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index aad7f9f66c4..eab2c1eda32 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterAll, beforeAll, describe, expect, test, vi } from "vitest"; import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js"; import { clearConfigCache } from "../config/config.js"; +import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { __setMaxChatHistoryMessagesBytesForTest } from "./server-constants.js"; import type { GatewayRequestContext, RespondFn } from "./server-methods/shared-types.js"; import { @@ -331,6 +332,113 @@ describe("gateway server chat", () => { } }); + test("chat.send reuses an active internal run for duplicate WebChat text sends", async () => { + const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + const dispatchRelease = createDeferred(); + try { + testState.sessionStorePath = path.join(sessionDir, "sessions.json"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + }); + + const responses: Array<{ id: string; ok: boolean; payload?: unknown; error?: unknown }> = []; + const context = { + loadGatewayModelCatalog: vi.fn(), + logGateway: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, + agentRunSeq: new Map(), + chatAbortControllers: new Map(), + chatAbortedRuns: new Map(), + chatRunBuffers: new Map(), + chatDeltaSentAt: new Map(), + chatDeltaLastBroadcastLen: new Map(), + addChatRun: vi.fn(), + removeChatRun: vi.fn(), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + registerToolEventRecipient: vi.fn(), + dedupe: new Map(), + } as unknown as GatewayRequestContext; + dispatchInboundMessageMock.mockImplementation(async () => dispatchRelease.promise); + + const { chatHandlers } = await import("./server-methods/chat.js"); + const callSend = (id: string, idempotencyKey: string) => + chatHandlers["chat.send"]({ + req: { + type: "req", + id, + method: "chat.send", + params: { + sessionKey: "main", + message: "?", + idempotencyKey, + }, + }, + params: { + sessionKey: "main", + message: "?", + idempotencyKey, + }, + client: { + connect: { + client: { + id: GATEWAY_CLIENT_NAMES.CONTROL_UI, + mode: GATEWAY_CLIENT_MODES.WEBCHAT, + }, + scopes: ["operator.write"], + }, + } as never, + isWebchatConnect: () => true, + respond: ((ok, payload, error) => { + responses.push({ id, ok, payload, error }); + }) as RespondFn, + context, + }); + + const first = Promise.resolve(callSend("first", "idem-active-a")); + await vi.waitFor(() => { + expect(responses).toContainEqual({ + id: "first", + ok: true, + payload: { runId: "idem-active-a", status: "started" }, + error: undefined, + }); + }, FAST_WAIT_OPTS); + + await callSend("duplicate", "idem-active-b"); + + expect(responses).toContainEqual({ + id: "duplicate", + ok: true, + payload: { runId: "idem-active-a", status: "in_flight" }, + error: undefined, + }); + expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1); + expect(context.addChatRun).toHaveBeenCalledTimes(1); + + dispatchRelease.resolve(); + await first; + await vi.waitFor(() => { + expect(context.removeChatRun).toHaveBeenCalledTimes(1); + }, FAST_WAIT_OPTS); + } finally { + dispatchRelease.resolve(); + dispatchInboundMessageMock.mockReset(); + testState.sessionStorePath = undefined; + clearConfigCache(); + await fs.rm(sessionDir, { recursive: true, force: true }); + } + }); + test("chat.history backfills claude-cli sessions from Claude project files", async () => { await withGatewayChatHarness(async ({ ws, createSessionDir }) => { await connectOk(ws);