diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 06a3407d98a..bfe175bdec9 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -521,6 +521,7 @@ function buildActiveChatSendDedupeKey(params: { message: string; originatingChannel: string; sessionKey: string; + systemScope?: string; }): string | null { const message = params.message.trim(); if ( @@ -532,8 +533,11 @@ function buildActiveChatSendDedupeKey(params: { ) { return null; } + const dedupeParts = params.systemScope?.trim() + ? [params.sessionKey, message, params.systemScope.trim()] + : [params.sessionKey, message]; const digest = createHash("sha256") - .update(JSON.stringify([params.sessionKey, message])) + .update(JSON.stringify(dedupeParts)) .digest("hex") .slice(0, 32); return `${ACTIVE_CHAT_SEND_DEDUPE_PREFIX}:${digest}`; @@ -3116,6 +3120,10 @@ export const chatHandlers: GatewayRequestHandlers = { const inboundMessage = sanitizedMessageResult.message; const systemInputProvenance = normalizeInputProvenance(p.systemInputProvenance); const systemProvenanceReceipt = systemReceiptResult.receipt; + const systemDedupeScope = + systemInputProvenance || systemProvenanceReceipt + ? JSON.stringify([systemProvenanceReceipt ?? null, systemInputProvenance ?? null]) + : undefined; const stopCommand = isChatStopCommandText(inboundMessage); const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(p.attachments); const rawMessage = inboundMessage.trim(); @@ -3284,6 +3292,7 @@ export const chatHandlers: GatewayRequestHandlers = { message: rawMessage, originatingChannel: originatingRoute.originatingChannel, sessionKey: activeRunScopeKey, + systemScope: systemDedupeScope, }); if (activeChatSendDedupeKey) { const activeRunId = resolveActiveChatSendRunId( diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 460576fdeac..a2b08c6763a 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -784,7 +784,7 @@ describe("gateway server chat", () => { }, ); - test("chat.send reuses an active internal run for duplicate WebChat text sends", async () => { + test("chat.send reuses only active WebChat text sends with the same system context", async () => { const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); const dispatchRelease = createDeferred(); try { @@ -827,7 +827,7 @@ describe("gateway server chat", () => { dispatchInboundMessageMock.mockImplementation(async () => dispatchRelease.promise); const { chatHandlers } = await import("./server-methods/chat.js"); - const callSend = (id: string, idempotencyKey: string) => + const callSend = (id: string, idempotencyKey: string, systemProvenanceReceipt?: string) => chatHandlers["chat.send"]({ req: { type: "req", @@ -837,12 +837,14 @@ describe("gateway server chat", () => { sessionKey: "main", message: "?", idempotencyKey, + ...(systemProvenanceReceipt ? { systemProvenanceReceipt } : {}), }, }, params: { sessionKey: "main", message: "?", idempotencyKey, + ...(systemProvenanceReceipt ? { systemProvenanceReceipt } : {}), }, client: { connect: { @@ -850,7 +852,7 @@ describe("gateway server chat", () => { id: GATEWAY_CLIENT_NAMES.CONTROL_UI, mode: GATEWAY_CLIENT_MODES.WEBCHAT, }, - scopes: ["operator.write"], + scopes: ["operator.write", "operator.admin"], }, } as never, isWebchatConnect: () => true, @@ -908,10 +910,56 @@ describe("gateway server chat", () => { expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1); expect(context.addChatRun).toHaveBeenCalledTimes(1); + const withSystemContext = Promise.resolve( + callSend("system-context", "idem-active-c", "proposal=support-file-sampler-b"), + ); + + await vi.waitFor( + () => { + expect(responses).toEqual([ + { + id: "first", + ok: true, + payload: expect.objectContaining({ + runId: "idem-active-a", + status: "started", + serverTiming: { + receivedToAckMs: expect.any(Number), + loadSessionMs: expect.any(Number), + }, + }), + error: undefined, + }, + { + id: "duplicate", + ok: true, + payload: { runId: "idem-active-a", status: "in_flight" }, + error: undefined, + }, + { + id: "system-context", + ok: true, + payload: expect.objectContaining({ + runId: "idem-active-c", + status: "started", + serverTiming: { + receivedToAckMs: expect.any(Number), + loadSessionMs: expect.any(Number), + }, + }), + error: undefined, + }, + ]); + }, + { timeout: 2_000, interval: 5 }, + ); + expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2); + expect(context.addChatRun).toHaveBeenCalledTimes(2); + dispatchRelease.resolve(); - await first; + await Promise.all([first, withSystemContext]); await vi.waitFor(() => { - expect(context.removeChatRun).toHaveBeenCalledTimes(1); + expect(context.removeChatRun).toHaveBeenCalledTimes(2); }, FAST_WAIT_OPTS); } finally { dispatchRelease.resolve();