mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:20:43 +00:00
fix(gateway): dedupe active WebChat sends
Collapse duplicate in-flight internal WebChat text sends onto the active Gateway run so rapid repeat submits do not start fresh `agent:main:main` dispatches. - Add active-run scoped internal text-send dedupe in `chat.send`. - Exclude slash commands, attachments, explicit delivery routes, non-internal origins, and completed runs. - Cover the behavior with a Gateway chat regression test. - Credit both the reporter and BunsDev in the Unreleased changelog entry. Validation: - `pnpm docs:list` - `git diff --check` - `pnpm check:changelog-attributions` - `pnpm exec oxfmt --check --threads=1 src/gateway/server-methods/chat.ts src/gateway/server.chat.gateway-server-chat-b.test.ts` - `pnpm test src/gateway/server.chat.gateway-server-chat-b.test.ts -t "duplicate WebChat" -- --reporter=dot` - Blacksmith Testbox `OPENCLAW_TESTBOX=1 pnpm check:changed` - GitHub PR security/stability checks for head `6884240414997228a136f0fbb85b73a8db4b7fae` Fixes #75737.
This commit is contained in:
@@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Control UI/WebChat: collapse duplicate in-flight internal text sends onto the active Gateway run so rapid repeat submits do not start fresh `agent:main:main` dispatches. Fixes #75737. Thanks @dsdsddd1 and @BunsDev.
|
||||
- Channels/streaming: expose `streaming.progress.label`, `labels`, `maxLines`, and `toolProgress` in bundled channel config metadata so progress draft settings appear in config, docs, and control surfaces. Thanks @vincentkoc.
|
||||
- Channels/streaming: normalize whitespace and case for `streaming.progress.label: "auto"` so progress draft labels keep using the built-in label pool instead of rendering a literal `auto` title. Thanks @vincentkoc.
|
||||
- Gateway/install: prefer supported system Node over nvm/fnm/volta/asdf/mise when regenerating managed gateway services, so `gateway install --force` no longer recreates service definitions that doctor immediately flags as version-manager-backed. Fixes #76339. Thanks @brokemac79.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
@@ -235,6 +236,40 @@ type ChatSendOriginatingRoute = {
|
||||
explicitDeliverRoute: boolean;
|
||||
};
|
||||
|
||||
const ACTIVE_CHAT_SEND_DEDUPE_PREFIX = "chat:active-send";
|
||||
|
||||
function resolveActiveChatSendRunId(value: unknown): string | null {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
return null;
|
||||
}
|
||||
const runId = (value as { runId?: unknown }).runId;
|
||||
return typeof runId === "string" && runId.trim() ? runId : null;
|
||||
}
|
||||
|
||||
function buildActiveChatSendDedupeKey(params: {
|
||||
attachmentCount: number;
|
||||
explicitDeliverRoute: boolean;
|
||||
message: string;
|
||||
originatingChannel: string;
|
||||
sessionKey: string;
|
||||
}): string | null {
|
||||
const message = params.message.trim();
|
||||
if (
|
||||
!message ||
|
||||
message.startsWith("/") ||
|
||||
params.attachmentCount > 0 ||
|
||||
params.explicitDeliverRoute ||
|
||||
normalizeMessageChannel(params.originatingChannel) !== INTERNAL_MESSAGE_CHANNEL
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
const digest = createHash("sha256")
|
||||
.update(JSON.stringify([params.sessionKey, message]))
|
||||
.digest("hex")
|
||||
.slice(0, 32);
|
||||
return `${ACTIVE_CHAT_SEND_DEDUPE_PREFIX}:${digest}`;
|
||||
}
|
||||
|
||||
type ChatSendExplicitOrigin = {
|
||||
originatingChannel?: string;
|
||||
originatingTo?: string;
|
||||
@@ -2015,6 +2050,35 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
});
|
||||
return;
|
||||
}
|
||||
const clientInfo = client?.connect?.client;
|
||||
const originatingRoute = resolveChatSendOriginatingRoute({
|
||||
client: clientInfo,
|
||||
deliver: p.deliver,
|
||||
entry,
|
||||
explicitOrigin: explicitOriginResult.value,
|
||||
hasConnectedClient: client?.connect !== undefined,
|
||||
mainKey: cfg.session?.mainKey,
|
||||
sessionKey,
|
||||
});
|
||||
const activeChatSendDedupeKey = buildActiveChatSendDedupeKey({
|
||||
attachmentCount: normalizedAttachments.length,
|
||||
explicitDeliverRoute: originatingRoute.explicitDeliverRoute,
|
||||
message: rawMessage,
|
||||
originatingChannel: originatingRoute.originatingChannel,
|
||||
sessionKey,
|
||||
});
|
||||
if (activeChatSendDedupeKey) {
|
||||
const activeRunId = resolveActiveChatSendRunId(
|
||||
context.dedupe.get(activeChatSendDedupeKey)?.payload,
|
||||
);
|
||||
if (activeRunId && context.chatAbortControllers.has(activeRunId)) {
|
||||
respond(true, { runId: activeRunId, status: "in_flight" as const }, undefined, {
|
||||
cached: true,
|
||||
runId: activeRunId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
const explicitOriginTargetsPlugin = explicitOriginTargetsPluginBinding(
|
||||
explicitOriginResult.value,
|
||||
);
|
||||
@@ -2097,6 +2161,13 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (activeChatSendDedupeKey) {
|
||||
context.dedupe.set(activeChatSendDedupeKey, {
|
||||
ts: now,
|
||||
ok: true,
|
||||
payload: { runId: clientRunId },
|
||||
});
|
||||
}
|
||||
context.addChatRun(clientRunId, {
|
||||
sessionKey,
|
||||
clientRunId,
|
||||
@@ -2126,22 +2197,13 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
const messageForAgent = systemProvenanceReceipt
|
||||
? [systemProvenanceReceipt, parsedMessage].filter(Boolean).join("\n\n")
|
||||
: parsedMessage;
|
||||
const clientInfo = client?.connect?.client;
|
||||
const {
|
||||
originatingChannel,
|
||||
originatingTo,
|
||||
accountId,
|
||||
messageThreadId,
|
||||
explicitDeliverRoute,
|
||||
} = resolveChatSendOriginatingRoute({
|
||||
client: clientInfo,
|
||||
deliver: p.deliver,
|
||||
entry,
|
||||
explicitOrigin: explicitOriginResult.value,
|
||||
hasConnectedClient: client?.connect !== undefined,
|
||||
mainKey: cfg.session?.mainKey,
|
||||
sessionKey,
|
||||
});
|
||||
} = originatingRoute;
|
||||
// Inject timestamp so agents know the current date/time.
|
||||
// Only BodyForAgent gets the timestamp — Body stays raw for UI display.
|
||||
// See: https://github.com/moltbot/moltbot/issues/3658
|
||||
|
||||
@@ -4,6 +4,7 @@ import path from "node:path";
|
||||
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
|
||||
import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js";
|
||||
import { clearConfigCache } from "../config/config.js";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
||||
import { __setMaxChatHistoryMessagesBytesForTest } from "./server-constants.js";
|
||||
import type { GatewayRequestContext, RespondFn } from "./server-methods/shared-types.js";
|
||||
import {
|
||||
@@ -331,6 +332,113 @@ describe("gateway server chat", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("chat.send reuses an active internal run for duplicate WebChat text sends", async () => {
|
||||
const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-"));
|
||||
const dispatchRelease = createDeferred<void>();
|
||||
try {
|
||||
testState.sessionStorePath = path.join(sessionDir, "sessions.json");
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
main: {
|
||||
sessionId: "sess-main",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const responses: Array<{ id: string; ok: boolean; payload?: unknown; error?: unknown }> = [];
|
||||
const context = {
|
||||
loadGatewayModelCatalog: vi.fn<GatewayRequestContext["loadGatewayModelCatalog"]>(),
|
||||
logGateway: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
},
|
||||
agentRunSeq: new Map<string, number>(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatAbortedRuns: new Map(),
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
addChatRun: vi.fn(),
|
||||
removeChatRun: vi.fn(),
|
||||
broadcast: vi.fn(),
|
||||
nodeSendToSession: vi.fn(),
|
||||
registerToolEventRecipient: vi.fn(),
|
||||
dedupe: new Map(),
|
||||
} as unknown as GatewayRequestContext;
|
||||
dispatchInboundMessageMock.mockImplementation(async () => dispatchRelease.promise);
|
||||
|
||||
const { chatHandlers } = await import("./server-methods/chat.js");
|
||||
const callSend = (id: string, idempotencyKey: string) =>
|
||||
chatHandlers["chat.send"]({
|
||||
req: {
|
||||
type: "req",
|
||||
id,
|
||||
method: "chat.send",
|
||||
params: {
|
||||
sessionKey: "main",
|
||||
message: "?",
|
||||
idempotencyKey,
|
||||
},
|
||||
},
|
||||
params: {
|
||||
sessionKey: "main",
|
||||
message: "?",
|
||||
idempotencyKey,
|
||||
},
|
||||
client: {
|
||||
connect: {
|
||||
client: {
|
||||
id: GATEWAY_CLIENT_NAMES.CONTROL_UI,
|
||||
mode: GATEWAY_CLIENT_MODES.WEBCHAT,
|
||||
},
|
||||
scopes: ["operator.write"],
|
||||
},
|
||||
} as never,
|
||||
isWebchatConnect: () => true,
|
||||
respond: ((ok, payload, error) => {
|
||||
responses.push({ id, ok, payload, error });
|
||||
}) as RespondFn,
|
||||
context,
|
||||
});
|
||||
|
||||
const first = Promise.resolve(callSend("first", "idem-active-a"));
|
||||
await vi.waitFor(() => {
|
||||
expect(responses).toContainEqual({
|
||||
id: "first",
|
||||
ok: true,
|
||||
payload: { runId: "idem-active-a", status: "started" },
|
||||
error: undefined,
|
||||
});
|
||||
}, FAST_WAIT_OPTS);
|
||||
|
||||
await callSend("duplicate", "idem-active-b");
|
||||
|
||||
expect(responses).toContainEqual({
|
||||
id: "duplicate",
|
||||
ok: true,
|
||||
payload: { runId: "idem-active-a", status: "in_flight" },
|
||||
error: undefined,
|
||||
});
|
||||
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(context.addChatRun).toHaveBeenCalledTimes(1);
|
||||
|
||||
dispatchRelease.resolve();
|
||||
await first;
|
||||
await vi.waitFor(() => {
|
||||
expect(context.removeChatRun).toHaveBeenCalledTimes(1);
|
||||
}, FAST_WAIT_OPTS);
|
||||
} finally {
|
||||
dispatchRelease.resolve();
|
||||
dispatchInboundMessageMock.mockReset();
|
||||
testState.sessionStorePath = undefined;
|
||||
clearConfigCache();
|
||||
await fs.rm(sessionDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("chat.history backfills claude-cli sessions from Claude project files", async () => {
|
||||
await withGatewayChatHarness(async ({ ws, createSessionDir }) => {
|
||||
await connectOk(ws);
|
||||
|
||||
Reference in New Issue
Block a user