fix: namespace chat dedupe by system context

This commit is contained in:
Shakker
2026-06-04 01:00:04 +01:00
committed by Shakker
parent f7e44ac6b5
commit 3da05d01a7
2 changed files with 63 additions and 6 deletions

View File

@@ -521,6 +521,7 @@ function buildActiveChatSendDedupeKey(params: {
message: string;
originatingChannel: string;
sessionKey: string;
systemScope?: string;
}): string | null {
const message = params.message.trim();
if (
@@ -532,8 +533,11 @@ function buildActiveChatSendDedupeKey(params: {
) {
return null;
}
const dedupeParts = params.systemScope?.trim()
? [params.sessionKey, message, params.systemScope.trim()]
: [params.sessionKey, message];
const digest = createHash("sha256")
.update(JSON.stringify([params.sessionKey, message]))
.update(JSON.stringify(dedupeParts))
.digest("hex")
.slice(0, 32);
return `${ACTIVE_CHAT_SEND_DEDUPE_PREFIX}:${digest}`;
@@ -3116,6 +3120,10 @@ export const chatHandlers: GatewayRequestHandlers = {
const inboundMessage = sanitizedMessageResult.message;
const systemInputProvenance = normalizeInputProvenance(p.systemInputProvenance);
const systemProvenanceReceipt = systemReceiptResult.receipt;
const systemDedupeScope =
systemInputProvenance || systemProvenanceReceipt
? JSON.stringify([systemProvenanceReceipt ?? null, systemInputProvenance ?? null])
: undefined;
const stopCommand = isChatStopCommandText(inboundMessage);
const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(p.attachments);
const rawMessage = inboundMessage.trim();
@@ -3284,6 +3292,7 @@ export const chatHandlers: GatewayRequestHandlers = {
message: rawMessage,
originatingChannel: originatingRoute.originatingChannel,
sessionKey: activeRunScopeKey,
systemScope: systemDedupeScope,
});
if (activeChatSendDedupeKey) {
const activeRunId = resolveActiveChatSendRunId(

View File

@@ -784,7 +784,7 @@ describe("gateway server chat", () => {
},
);
test("chat.send reuses an active internal run for duplicate WebChat text sends", async () => {
test("chat.send reuses only active WebChat text sends with the same system context", async () => {
const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-"));
const dispatchRelease = createDeferred<void>();
try {
@@ -827,7 +827,7 @@ describe("gateway server chat", () => {
dispatchInboundMessageMock.mockImplementation(async () => dispatchRelease.promise);
const { chatHandlers } = await import("./server-methods/chat.js");
const callSend = (id: string, idempotencyKey: string) =>
const callSend = (id: string, idempotencyKey: string, systemProvenanceReceipt?: string) =>
chatHandlers["chat.send"]({
req: {
type: "req",
@@ -837,12 +837,14 @@ describe("gateway server chat", () => {
sessionKey: "main",
message: "?",
idempotencyKey,
...(systemProvenanceReceipt ? { systemProvenanceReceipt } : {}),
},
},
params: {
sessionKey: "main",
message: "?",
idempotencyKey,
...(systemProvenanceReceipt ? { systemProvenanceReceipt } : {}),
},
client: {
connect: {
@@ -850,7 +852,7 @@ describe("gateway server chat", () => {
id: GATEWAY_CLIENT_NAMES.CONTROL_UI,
mode: GATEWAY_CLIENT_MODES.WEBCHAT,
},
scopes: ["operator.write"],
scopes: ["operator.write", "operator.admin"],
},
} as never,
isWebchatConnect: () => true,
@@ -908,10 +910,56 @@ describe("gateway server chat", () => {
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1);
expect(context.addChatRun).toHaveBeenCalledTimes(1);
const withSystemContext = Promise.resolve(
callSend("system-context", "idem-active-c", "proposal=support-file-sampler-b"),
);
await vi.waitFor(
() => {
expect(responses).toEqual([
{
id: "first",
ok: true,
payload: expect.objectContaining({
runId: "idem-active-a",
status: "started",
serverTiming: {
receivedToAckMs: expect.any(Number),
loadSessionMs: expect.any(Number),
},
}),
error: undefined,
},
{
id: "duplicate",
ok: true,
payload: { runId: "idem-active-a", status: "in_flight" },
error: undefined,
},
{
id: "system-context",
ok: true,
payload: expect.objectContaining({
runId: "idem-active-c",
status: "started",
serverTiming: {
receivedToAckMs: expect.any(Number),
loadSessionMs: expect.any(Number),
},
}),
error: undefined,
},
]);
},
{ timeout: 2_000, interval: 5 },
);
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
expect(context.addChatRun).toHaveBeenCalledTimes(2);
dispatchRelease.resolve();
await first;
await Promise.all([first, withSystemContext]);
await vi.waitFor(() => {
expect(context.removeChatRun).toHaveBeenCalledTimes(1);
expect(context.removeChatRun).toHaveBeenCalledTimes(2);
}, FAST_WAIT_OPTS);
} finally {
dispatchRelease.resolve();