From 8bc4d4bcd4049485d6c5cb37413b31834796ec3b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 26 Apr 2026 10:46:52 +0100 Subject: [PATCH] fix: prevent duplicate chat attachment send races --- CHANGELOG.md | 1 + src/gateway/server-methods/chat.ts | 7 + .../server.chat.gateway-server-chat-b.test.ts | 136 ++++++++++++++++++ 3 files changed, 144 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f26f6df952..b9725795c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai - Plugins/CLI: make plugin install and uninstall config writes conflict-aware, clear stale denylist entries on explicit reinstall/removal, and delete managed plugin files only after config/index commit succeeds. Thanks @codex. - Plugins: fail `plugins update` when tracked plugin or hook updates error, keep bundled runtime-dependency repair behind restrictive allowlists, and reject package installs with unloadable extension entries. Thanks @codex. +- Gateway/chat: keep duplicate attachment-backed `chat.send` retries with the same idempotency key on the documented in-flight path so aborts still target the real active run. Fixes #70139. Thanks @Feelw00. ## 2026.4.26 diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 4f1d0769b92..eca437d3edd 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1854,6 +1854,13 @@ export const chatHandlers: GatewayRequestHandlers = { ownerDeviceId: normalizeOptionalText(client?.connect?.device?.id), kind: "chat-send", }); + if (!activeRunAbort.registered) { + respond(true, { runId: clientRunId, status: "in_flight" as const }, undefined, { + cached: true, + runId: clientRunId, + }); + return; + } context.addChatRun(clientRunId, { sessionKey, clientRunId, diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 1039405181f..7427ce08cfc 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -5,9 +5,11 @@ import { afterAll, beforeAll, describe, expect, test, vi } from "vitest"; import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js"; import { clearConfigCache } from "../config/config.js"; import { __setMaxChatHistoryMessagesBytesForTest } from "./server-constants.js"; +import type { GatewayRequestContext, RespondFn } from "./server-methods/shared-types.js"; import { connectOk, createGatewaySuiteHarness, + dispatchInboundMessageMock, getReplyFromConfig, installGatewayTestHooks, mockGetReplyFromConfigOnce, @@ -47,6 +49,16 @@ const sendReq = ( ); }; +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + async function withGatewayChatHarness( run: (ctx: { ws: GatewaySocket; createSessionDir: () => Promise }) => Promise, ) { @@ -123,6 +135,130 @@ async function prepareMainHistoryHarness(params: { } describe("gateway server chat", () => { + test("chat.send returns in_flight when duplicate attachment send wins parsing race", async () => { + const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + const dispatchRelease = createDeferred(); + try { + testState.sessionStorePath = path.join(sessionDir, "sessions.json"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + modelProvider: "test-provider", + model: "vision-model", + updatedAt: Date.now(), + }, + }, + }); + + const firstCatalog = + createDeferred>>(); + const responses: Array<{ id: string; ok: boolean; payload?: unknown; error?: unknown }> = []; + const context = { + loadGatewayModelCatalog: vi + .fn() + .mockImplementationOnce(() => firstCatalog.promise) + .mockResolvedValue([ + { + id: "vision-model", + name: "Vision Model", + provider: "test-provider", + input: ["text", "image"], + }, + ]), + logGateway: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, + agentRunSeq: new Map(), + chatAbortControllers: new Map(), + chatAbortedRuns: new Map(), + chatRunBuffers: new Map(), + chatDeltaSentAt: new Map(), + chatDeltaLastBroadcastLen: new Map(), + addChatRun: vi.fn(), + removeChatRun: vi.fn(), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + registerToolEventRecipient: vi.fn(), + dedupe: new Map(), + } as unknown as GatewayRequestContext; + dispatchInboundMessageMock.mockImplementation(async () => dispatchRelease.promise); + + const pngB64 = + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/woAAn8B9FD5fHAAAAAASUVORK5CYII="; + const params = { + sessionKey: "main", + message: "see image", + idempotencyKey: "idem-attachment-race", + attachments: [ + { + type: "image", + mimeType: "image/png", + fileName: "dot.png", + content: pngB64, + }, + ], + }; + const { chatHandlers } = await import("./server-methods/chat.js"); + const callSend = (id: string) => + chatHandlers["chat.send"]({ + req: { type: "req", id, method: "chat.send", params }, + params, + client: null, + isWebchatConnect: () => false, + respond: ((ok, payload, error) => { + responses.push({ id, ok, payload, error }); + }) as RespondFn, + context, + }); + + const first = Promise.resolve(callSend("first")); + await vi.waitFor(() => { + expect(context.loadGatewayModelCatalog).toHaveBeenCalledTimes(1); + }, FAST_WAIT_OPTS); + + await callSend("duplicate"); + expect(responses).toContainEqual({ + id: "duplicate", + ok: true, + payload: { runId: "idem-attachment-race", status: "started" }, + error: undefined, + }); + + firstCatalog.resolve([ + { + id: "vision-model", + name: "Vision Model", + provider: "test-provider", + input: ["text", "image"], + }, + ]); + await first; + + expect(responses).toContainEqual({ + id: "first", + ok: true, + payload: { runId: "idem-attachment-race", status: "in_flight" }, + error: undefined, + }); + expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1); + expect(context.addChatRun).toHaveBeenCalledTimes(1); + dispatchRelease.resolve(); + await vi.waitFor(() => { + expect(context.removeChatRun).toHaveBeenCalledTimes(1); + }, FAST_WAIT_OPTS); + } finally { + dispatchRelease.resolve(); + dispatchInboundMessageMock.mockReset(); + testState.sessionStorePath = undefined; + clearConfigCache(); + await fs.rm(sessionDir, { recursive: true, force: true }); + } + }); + test("chat.history backfills claude-cli sessions from Claude project files", async () => { await withGatewayChatHarness(async ({ ws, createSessionDir }) => { await connectOk(ws);