From ddcfde148970912893d9e9975de9bb194f6bf2d9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 16:02:34 +0100 Subject: [PATCH] fix: propagate room event tool context --- .../telegram/src/bot-message-dispatch.test.ts | 82 +++++++++++++++++++ .../telegram/src/bot-message-dispatch.ts | 2 +- src/agents/cli-runner/prepare.test.ts | 59 +++++++++++++ src/agents/cli-runner/prepare.ts | 1 + src/gateway/mcp-http.loopback-runtime.ts | 1 + src/gateway/mcp-http.request.ts | 8 ++ src/gateway/mcp-http.runtime.ts | 4 + src/gateway/mcp-http.test.ts | 28 ++++++- src/gateway/mcp-http.ts | 2 + src/gateway/tool-resolution.ts | 3 + 10 files changed, 188 insertions(+), 2 deletions(-) diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index c6fbb0f0d03..63a4f2e09aa 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -6,6 +6,7 @@ import { createSequencedTestDraftStream, createTestDraftStream, } from "./draft-stream.test-helpers.js"; +import { notifyTelegramInboundTurnOutboundSuccess } from "./inbound-turn-delivery.js"; type DispatchReplyWithBufferedBlockDispatcherArgs = Parameters< TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"] @@ -1641,6 +1642,87 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(groupHistories.get(historyKey)).toHaveLength(1); }); + it("clears delivered room-event history when a newer turn supersedes dispatch", async () => { + const historyKey = "telegram:group:-100123"; + const groupHistories = new Map([ + [historyKey, [{ sender: "Alice", body: "lunch at two", timestamp: 1 }]], + ]); + let firstStarted: (() => void) | undefined; + const firstStartGate = new Promise((resolve) => { + firstStarted = resolve; + }); + let releaseFirst: (() => void) | undefined; + const firstGate = new Promise((resolve) => { + releaseFirst = resolve; + }); + let secondStarted: (() => void) | undefined; + const secondStartGate = new Promise((resolve) => { + secondStarted = resolve; + }); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async () => { + firstStarted?.(); + await firstGate; + return { + queuedFinal: false, + counts: { block: 0, final: 0, tool: 0 }, + sourceReplyDeliveryMode: "message_tool_only", + }; + }) + .mockImplementationOnce(async () => { + secondStarted?.(); + return { + queuedFinal: false, + counts: { block: 0, final: 0, tool: 0 }, + sourceReplyDeliveryMode: "message_tool_only", + }; + }); + + const createRoomContext = (messageId: number, body: string) => + createContext({ + ctxPayload: { + InboundTurnKind: "room_event", + SessionKey: "agent:main:telegram:group:-100123", + ChatType: "group", + MessageSid: String(messageId), + RawBody: body, + BodyForAgent: body, + CommandBody: body, + } as unknown as TelegramMessageContext["ctxPayload"], + msg: { + chat: { id: -100123, type: "supergroup" }, + message_id: messageId, + } as unknown as TelegramMessageContext["msg"], + chatId: -100123, + isGroup: true, + historyKey, + historyLimit: 10, + groupHistories, + threadSpec: { id: undefined, scope: "none" }, + }); + + const firstPromise = dispatchWithContext({ + context: createRoomContext(99, "ambient one"), + streamMode: "partial", + }); + await firstStartGate; + notifyTelegramInboundTurnOutboundSuccess({ + sessionKey: "agent:main:telegram:group:-100123", + to: "telegram:-100123", + inboundTurnKind: "room_event", + }); + const secondPromise = dispatchWithContext({ + context: createRoomContext(100, "ambient two"), + streamMode: "partial", + }); + + await secondStartGate; + releaseFirst?.(); + await Promise.all([firstPromise, secondPromise]); + + expect(groupHistories.get(historyKey)).toHaveLength(0); + }); + it("does not let room events supersede active user-request dispatch", async () => { const historyKey = "telegram:group:-100123"; const groupHistories = new Map([[historyKey, []]]); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index f0b7fde496c..a93e70fb5fb 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -1681,7 +1681,7 @@ export const dispatchTelegramMessage = async ({ }, }); } - if (!isRoomEvent) { + if (!isRoomEvent || deliveryState.snapshot().delivered) { clearGroupHistory(); } return; diff --git a/src/agents/cli-runner/prepare.test.ts b/src/agents/cli-runner/prepare.test.ts index 287014894ab..7807adb34c6 100644 --- a/src/agents/cli-runner/prepare.test.ts +++ b/src/agents/cli-runner/prepare.test.ts @@ -66,6 +66,7 @@ function createTestMcpLoopbackServerConfig(port: number) { "x-openclaw-agent-id": "${OPENCLAW_MCP_AGENT_ID}", "x-openclaw-account-id": "${OPENCLAW_MCP_ACCOUNT_ID}", "x-openclaw-message-channel": "${OPENCLAW_MCP_MESSAGE_CHANNEL}", + "x-openclaw-inbound-turn-kind": "${OPENCLAW_MCP_INBOUND_TURN_KIND}", }, }, }, @@ -780,6 +781,64 @@ describe("shouldSkipLocalCliCredentialEpoch", () => { } }); + it("passes current turn kind into bundle MCP loopback env", async () => { + const { dir, sessionFile } = createSessionFile(); + try { + const getActiveMcpLoopbackRuntime = vi.fn(() => ({ + port: 31783, + ownerToken: "owner-token", + nonOwnerToken: "non-owner-token", + })); + const ensureMcpLoopbackServer = vi.fn(createTestMcpLoopbackServer); + const createMcpLoopbackServerConfig = vi.fn(createTestMcpLoopbackServerConfig); + setCliRunnerPrepareTestDeps({ + getActiveMcpLoopbackRuntime, + ensureMcpLoopbackServer, + createMcpLoopbackServerConfig, + }); + cliBackendsTesting.setDepsForTest({ + resolvePluginSetupCliBackend: () => undefined, + resolveRuntimeCliBackends: () => [ + { + id: "native-cli", + pluginId: "native-plugin", + bundleMcp: true, + bundleMcpMode: "codex-config-overrides", + config: { + command: "native-cli", + args: ["--print"], + output: "text", + input: "arg", + sessionMode: "existing", + }, + }, + ], + }); + + const context = await prepareCliRunContext({ + sessionId: "session-test", + sessionKey: "agent:main:telegram:group:chat123", + sessionFile, + workspaceDir: dir, + prompt: "latest ask", + provider: "native-cli", + model: "test-model", + timeoutMs: 1_000, + runId: "run-test-room-event-tools", + config: createCliBackendConfig(), + currentTurnKind: "room_event", + messageChannel: "telegram", + }); + + expect(context.preparedBackend.env).toMatchObject({ + OPENCLAW_MCP_MESSAGE_CHANNEL: "telegram", + OPENCLAW_MCP_INBOUND_TURN_KIND: "room_event", + }); + } finally { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + it("fails closed when a runtime toolsAllow is requested for CLI backends", async () => { const { dir, sessionFile } = createSessionFile(); try { diff --git a/src/agents/cli-runner/prepare.ts b/src/agents/cli-runner/prepare.ts index 82e6ac90daa..60d33fc6739 100644 --- a/src/agents/cli-runner/prepare.ts +++ b/src/agents/cli-runner/prepare.ts @@ -226,6 +226,7 @@ export async function prepareCliRunContext( OPENCLAW_MCP_ACCOUNT_ID: params.agentAccountId ?? "", OPENCLAW_MCP_SESSION_KEY: params.sessionKey ?? "", OPENCLAW_MCP_MESSAGE_CHANNEL: params.messageChannel ?? params.messageProvider ?? "", + OPENCLAW_MCP_INBOUND_TURN_KIND: params.currentTurnKind ?? "", } : undefined, warn: (message) => cliBackendLog.warn(message), diff --git a/src/gateway/mcp-http.loopback-runtime.ts b/src/gateway/mcp-http.loopback-runtime.ts index 565f96820ed..649e36059a1 100644 --- a/src/gateway/mcp-http.loopback-runtime.ts +++ b/src/gateway/mcp-http.loopback-runtime.ts @@ -39,6 +39,7 @@ export function createMcpLoopbackServerConfig(port: number) { "x-openclaw-agent-id": "${OPENCLAW_MCP_AGENT_ID}", "x-openclaw-account-id": "${OPENCLAW_MCP_ACCOUNT_ID}", "x-openclaw-message-channel": "${OPENCLAW_MCP_MESSAGE_CHANNEL}", + "x-openclaw-inbound-turn-kind": "${OPENCLAW_MCP_INBOUND_TURN_KIND}", }, }, }, diff --git a/src/gateway/mcp-http.request.ts b/src/gateway/mcp-http.request.ts index 82b69e2461c..cf9bc3461a2 100644 --- a/src/gateway/mcp-http.request.ts +++ b/src/gateway/mcp-http.request.ts @@ -1,4 +1,5 @@ import type { IncomingMessage, ServerResponse } from "node:http"; +import type { InboundTurnKind } from "../channels/turn/kind.js"; import { resolveMainSessionKey } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { isTruthyEnvValue } from "../infra/env.js"; @@ -29,6 +30,7 @@ type McpRequestContext = { sessionKey: string; messageProvider: string | undefined; accountId: string | undefined; + inboundTurnKind: InboundTurnKind | undefined; senderIsOwner: boolean; }; @@ -37,6 +39,11 @@ function resolveScopedSessionKey(cfg: OpenClawConfig, rawSessionKey: string | un return !trimmed || trimmed === "main" ? resolveMainSessionKey(cfg) : trimmed; } +function normalizeMcpInboundTurnKind(value: string | undefined): InboundTurnKind | undefined { + const trimmed = normalizeOptionalString(value); + return trimmed === "room_event" || trimmed === "user_request" ? trimmed : undefined; +} + function rejectsBrowserLoopbackRequest(req: IncomingMessage): boolean { const origin = getHeader(req, "origin"); if (!origin) { @@ -173,6 +180,7 @@ export function resolveMcpRequestContext( messageProvider: normalizeMessageChannel(getHeader(req, "x-openclaw-message-channel")) ?? undefined, accountId: normalizeOptionalString(getHeader(req, "x-openclaw-account-id")), + inboundTurnKind: normalizeMcpInboundTurnKind(getHeader(req, "x-openclaw-inbound-turn-kind")), senderIsOwner: auth.senderIsOwner, }; } diff --git a/src/gateway/mcp-http.runtime.ts b/src/gateway/mcp-http.runtime.ts index 9a19e61634b..c429cffc10c 100644 --- a/src/gateway/mcp-http.runtime.ts +++ b/src/gateway/mcp-http.runtime.ts @@ -1,4 +1,5 @@ import { applyOwnerOnlyToolPolicy } from "../agents/tool-policy.js"; +import type { InboundTurnKind } from "../channels/turn/kind.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { buildMcpToolSchema, @@ -26,12 +27,14 @@ export class McpLoopbackToolCache { sessionKey: string; messageProvider: string | undefined; accountId: string | undefined; + inboundTurnKind: InboundTurnKind | undefined; senderIsOwner: boolean | undefined; }): CachedScopedTools { const cacheKey = [ params.sessionKey, params.messageProvider ?? "", params.accountId ?? "", + params.inboundTurnKind ?? "", params.senderIsOwner === true ? "owner" : "non-owner", ].join("\u0000"); const now = Date.now(); @@ -45,6 +48,7 @@ export class McpLoopbackToolCache { sessionKey: params.sessionKey, messageProvider: params.messageProvider, accountId: params.accountId, + inboundTurnKind: params.inboundTurnKind, senderIsOwner: params.senderIsOwner, surface: "loopback", excludeToolNames: NATIVE_TOOL_EXCLUDE, diff --git a/src/gateway/mcp-http.test.ts b/src/gateway/mcp-http.test.ts index 6782ef7b69d..20ab87c7c73 100644 --- a/src/gateway/mcp-http.test.ts +++ b/src/gateway/mcp-http.test.ts @@ -154,7 +154,7 @@ afterEach(async () => { }); describe("mcp loopback server", () => { - it("passes session, account, and message channel headers into shared tool resolution", async () => { + it("passes session, account, message channel, and inbound turn headers into shared tool resolution", async () => { const port = await getFreePortBlockWithPermissionFallback({ offsets: [0], fallbackBase: 53_000, @@ -170,6 +170,7 @@ describe("mcp loopback server", () => { "x-session-key": "agent:main:telegram:group:chat123", "x-openclaw-account-id": "work", "x-openclaw-message-channel": "telegram", + "x-openclaw-inbound-turn-kind": "room_event", }, body: JSON.stringify({ jsonrpc: "2.0", id: 1, method: "tools/list" }), }); @@ -179,6 +180,7 @@ describe("mcp loopback server", () => { expect(call.sessionKey).toBe("agent:main:telegram:group:chat123"); expect(call.accountId).toBe("work"); expect(call.messageProvider).toBe("telegram"); + expect(call.inboundTurnKind).toBe("room_event"); expect(call.senderIsOwner).toBe(false); expect(call.surface).toBe("loopback"); expect(Array.from(call.excludeToolNames ?? [])).toEqual([ @@ -191,6 +193,30 @@ describe("mcp loopback server", () => { ]); }); + it("keeps loopback tool cache entries separate by inbound turn kind", async () => { + server = await startMcpLoopbackServer(0); + const runtime = getActiveMcpLoopbackRuntime(); + const sendToolsList = async (inboundTurnKind: string) => + await sendRaw({ + port: server?.port ?? 0, + token: runtime ? resolveMcpLoopbackBearerToken(runtime, false) : undefined, + headers: { + "content-type": "application/json", + "x-session-key": "agent:main:telegram:group:chat123", + "x-openclaw-message-channel": "telegram", + "x-openclaw-inbound-turn-kind": inboundTurnKind, + }, + body: JSON.stringify({ jsonrpc: "2.0", id: 1, method: "tools/list" }), + }); + + expect((await sendToolsList("user_request")).status).toBe(200); + expect((await sendToolsList("room_event")).status).toBe(200); + + expect(resolveGatewayScopedToolsMock).toHaveBeenCalledTimes(2); + expect(getScopedToolsCall(0).inboundTurnKind).toBe("user_request"); + expect(getScopedToolsCall(1).inboundTurnKind).toBe("room_event"); + }); + it("adds empty properties for object schemas that omit properties", async () => { resolveGatewayScopedToolsMock.mockReturnValue({ agentId: "main", diff --git a/src/gateway/mcp-http.ts b/src/gateway/mcp-http.ts index 26ee6ee1cd3..b5069264539 100644 --- a/src/gateway/mcp-http.ts +++ b/src/gateway/mcp-http.ts @@ -110,6 +110,7 @@ export async function startMcpLoopbackServer(port = 0): Promise<{ sessionKey: requestContext.sessionKey, messageProvider: requestContext.messageProvider, accountId: requestContext.accountId, + inboundTurnKind: requestContext.inboundTurnKind, senderIsOwner: requestContext.senderIsOwner, }); @@ -118,6 +119,7 @@ export async function startMcpLoopbackServer(port = 0): Promise<{ batchSize: messages.length, methods: messages.map((message) => message.method), sessionKey: requestContext.sessionKey, + inboundTurnKind: requestContext.inboundTurnKind, senderIsOwner: requestContext.senderIsOwner, toolCount: scopedTools.toolSchema.length, cronVisible: scopedTools.toolSchema.some((tool) => tool.name === "cron"), diff --git a/src/gateway/tool-resolution.ts b/src/gateway/tool-resolution.ts index 181c7c73bf4..daa3446b6c7 100644 --- a/src/gateway/tool-resolution.ts +++ b/src/gateway/tool-resolution.ts @@ -23,6 +23,7 @@ import { resolveToolProfilePolicy, } from "../agents/tool-policy.js"; import type { AnyAgentTool } from "../agents/tools/common.js"; +import type { InboundTurnKind } from "../channels/turn/kind.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { logWarn } from "../logger.js"; import { getPluginToolMeta } from "../plugins/tools.js"; @@ -35,6 +36,7 @@ export function resolveGatewayScopedTools(params: { sessionKey: string; messageProvider?: string; accountId?: string; + inboundTurnKind?: InboundTurnKind; agentTo?: string; agentThreadId?: string; allowGatewaySubagentBinding?: boolean; @@ -133,6 +135,7 @@ export function resolveGatewayScopedTools(params: { agentSessionKey: params.sessionKey, agentChannel: params.messageProvider ?? undefined, agentAccountId: params.accountId, + inboundTurnKind: params.inboundTurnKind, agentTo: params.agentTo, agentThreadId: params.agentThreadId, allowGatewaySubagentBinding: params.allowGatewaySubagentBinding,