diff --git a/extensions/telegram/src/action-runtime.test.ts b/extensions/telegram/src/action-runtime.test.ts index e0d934c7c21..a936505a9a0 100644 --- a/extensions/telegram/src/action-runtime.test.ts +++ b/extensions/telegram/src/action-runtime.test.ts @@ -408,6 +408,42 @@ describe("handleTelegramAction", () => { end(); }); + it("marks room-event delivery correlations separately", async () => { + let roomEventCount = 0; + let userRequestCount = 0; + const endRoomEvent = beginTelegramInboundTurnDeliveryCorrelation( + "telegram-session", + { + outboundTo: "@testchannel", + markInboundTurnDelivered: () => { + roomEventCount += 1; + }, + }, + { inboundTurnKind: "room_event" }, + ); + const endUserRequest = beginTelegramInboundTurnDeliveryCorrelation("telegram-session", { + outboundTo: "@testchannel", + markInboundTurnDelivered: () => { + userRequestCount += 1; + }, + }); + + await handleTelegramAction( + { + action: "sendMessage", + to: "@testchannel", + content: "Hello from a room event", + }, + telegramConfig(), + { sessionKey: "telegram-session", inboundTurnKind: "room_event" }, + ); + + expect(roomEventCount).toBe(1); + expect(userRequestCount).toBe(0); + endRoomEvent(); + endUserRequest(); + }); + it("accepts shared send action aliases", async () => { await handleTelegramAction( { diff --git a/extensions/telegram/src/action-runtime.ts b/extensions/telegram/src/action-runtime.ts index 9c872112601..7791add854f 100644 --- a/extensions/telegram/src/action-runtime.ts +++ b/extensions/telegram/src/action-runtime.ts @@ -19,7 +19,10 @@ import { import type { MessagePresentation } from "openclaw/plugin-sdk/interactive-runtime"; import { createTelegramActionGate, resolveTelegramPollActionGateState } from "./accounts.js"; import { resolveTelegramInlineButtons } from "./button-types.js"; -import { notifyTelegramInboundTurnOutboundSuccess } from "./inbound-turn-delivery.js"; +import { + notifyTelegramInboundTurnOutboundSuccess, + type TelegramInboundTurnDeliveryKind, +} from "./inbound-turn-delivery.js"; import { resolveTelegramInlineButtonsScope, resolveTelegramTargetChatType, @@ -230,6 +233,7 @@ export async function handleTelegramAction( mediaLocalRoots?: readonly string[]; mediaReadFile?: (filePath: string) => Promise; sessionKey?: string | null; + inboundTurnKind?: TelegramInboundTurnDeliveryKind | string; }, ): Promise> { const { action, accountId } = { @@ -400,6 +404,7 @@ export async function handleTelegramAction( sessionKey: options?.sessionKey ?? undefined, to, accountId, + inboundTurnKind: options?.inboundTurnKind, }); await maybePinTelegramActionSend({ args: params, diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 231e7f06beb..c6fbb0f0d03 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -1724,6 +1724,91 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliveredTexts).toContain("visible request answer"); }); + it("lets user requests supersede active room-event dispatch", async () => { + const historyKey = "telegram:group:-100123"; + const groupHistories = new Map([[historyKey, []]]); + let roomEventStarted: (() => void) | undefined; + const roomEventStartGate = new Promise((resolve) => { + roomEventStarted = resolve; + }); + let releaseRoomEvent: (() => void) | undefined; + const roomEventGate = new Promise((resolve) => { + releaseRoomEvent = resolve; + }); + let userRequestStarted: (() => void) | undefined; + const userRequestStartGate = new Promise((resolve) => { + userRequestStarted = resolve; + }); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ dispatcherOptions }) => { + roomEventStarted?.(); + await roomEventGate; + await dispatcherOptions.deliver({ text: "stale ambient answer" }, { kind: "final" }); + return { + queuedFinal: true, + counts: { block: 0, final: 1, tool: 0 }, + sourceReplyDeliveryMode: "message_tool_only", + }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + userRequestStarted?.(); + await dispatcherOptions.deliver({ text: "fresh request answer" }, { kind: "final" }); + return { + queuedFinal: true, + counts: { block: 0, final: 1, tool: 0 }, + }; + }); + + const createGroupContext = ( + kind: "user_request" | "room_event", + messageId: number, + body: string, + ) => + createContext({ + ctxPayload: { + InboundTurnKind: kind, + SessionKey: "agent:main:telegram:group:-100123", + ChatType: "group", + MessageSid: String(messageId), + RawBody: body, + BodyForAgent: body, + CommandBody: body, + CommandAuthorized: true, + } as unknown as TelegramMessageContext["ctxPayload"], + msg: { + chat: { id: -100123, type: "supergroup" }, + message_id: messageId, + } as unknown as TelegramMessageContext["msg"], + chatId: -100123, + isGroup: true, + historyKey, + historyLimit: 10, + groupHistories, + threadSpec: { id: undefined, scope: "none" }, + }); + + const roomEventPromise = dispatchWithContext({ + context: createGroupContext("room_event", 99, "ambient chatter"), + streamMode: "off", + }); + await roomEventStartGate; + const userRequestPromise = dispatchWithContext({ + context: createGroupContext("user_request", 100, "@bot answer now"), + streamMode: "off", + }); + await userRequestStartGate; + releaseRoomEvent?.(); + await Promise.all([roomEventPromise, userRequestPromise]); + + const deliveredTexts = deliverReplies.mock.calls.flatMap((call) => + ((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map( + (reply) => reply.text, + ), + ); + expect(deliveredTexts).toContain("fresh request answer"); + expect(deliveredTexts).not.toContain("stale ambient answer"); + }); + it("does not send visible error fallbacks for room events", async () => { const historyKey = "telegram:group:-100123"; const groupHistories = new Map([ diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 98efe85315e..f0b7fde496c 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -168,6 +168,11 @@ type TelegramReplyFenceState = { activeDispatches: number; }; +type TelegramReplyFenceKey = { + activeKey: string; + roomEventKey: string; +}; + // Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work. const telegramReplyFenceByKey = new Map(); @@ -183,12 +188,16 @@ function resolveTelegramReplyFenceKey(params: { ctxPayload: { SessionKey?: string; CommandTargetSessionKey?: string; InboundTurnKind?: string }; chatId: number | string; threadSpec: { id?: number | string | null; scope?: string }; -}): string { +}): TelegramReplyFenceKey { const baseKey = normalizeTelegramFenceKey(params.ctxPayload.CommandTargetSessionKey) ?? normalizeTelegramFenceKey(params.ctxPayload.SessionKey) ?? `telegram:${String(params.chatId)}:${params.threadSpec.scope ?? "default"}:${params.threadSpec.id ?? "root"}`; - return params.ctxPayload.InboundTurnKind === "room_event" ? `${baseKey}:room_event` : baseKey; + const roomEventKey = `${baseKey}:room_event`; + return { + activeKey: params.ctxPayload.InboundTurnKind === "room_event" ? roomEventKey : baseKey, + roomEventKey, + }; } function beginTelegramReplyFence(params: { key: string; supersede: boolean }): number { @@ -205,6 +214,15 @@ function beginTelegramReplyFence(params: { key: string; supersede: boolean }): n return state.generation; } +function supersedeTelegramReplyFence(key: string): void { + const state = telegramReplyFenceByKey.get(key); + if (!state) { + return; + } + state.generation += 1; + telegramReplyFenceByKey.set(key, state); +} + function isTelegramReplyFenceSuperseded(params: { key: string; generation: number }): boolean { return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation; } @@ -459,14 +477,14 @@ export const dispatchTelegramMessage = async ({ const isDispatchSuperseded = () => replyFenceGeneration !== undefined && isTelegramReplyFenceSuperseded({ - key: replyFenceKey, + key: replyFenceKey.activeKey, generation: replyFenceGeneration, }); const releaseReplyFence = () => { if (replyFenceGeneration === undefined) { return; } - endTelegramReplyFence(replyFenceKey); + endTelegramReplyFence(replyFenceKey.activeKey); replyFenceGeneration = undefined; }; const draftMaxChars = Math.min(textLimit, 4096); @@ -851,9 +869,13 @@ export const dispatchTelegramMessage = async ({ const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); + const supersedeReplyFence = shouldSupersedeTelegramReplyFence(ctxPayload); + if (!isRoomEvent && supersedeReplyFence) { + supersedeTelegramReplyFence(replyFenceKey.roomEventKey); + } replyFenceGeneration = beginTelegramReplyFence({ - key: replyFenceKey, - supersede: shouldSupersedeTelegramReplyFence(ctxPayload), + key: replyFenceKey.activeKey, + supersede: supersedeReplyFence, }); const implicitQuoteReplyTargetId = @@ -875,6 +897,7 @@ export const dispatchTelegramMessage = async ({ outboundAccountId: route.accountId, markInboundTurnDelivered: () => deliveryState.markDelivered(), }, + { inboundTurnKind: ctxPayload.InboundTurnKind }, ); const clearGroupHistory = () => { if (isGroup && historyKey) { diff --git a/extensions/telegram/src/channel-actions.ts b/extensions/telegram/src/channel-actions.ts index f0e511ab5c4..bb58edbf7c8 100644 --- a/extensions/telegram/src/channel-actions.ts +++ b/extensions/telegram/src/channel-actions.ts @@ -184,6 +184,7 @@ export const telegramMessageActions: ChannelMessageActionAdapter = { accountId, mediaLocalRoots, sessionKey, + inboundTurnKind, toolContext, }) => { const telegramAction = resolveTelegramMessageActionName(action); @@ -202,7 +203,7 @@ export const telegramMessageActions: ChannelMessageActionAdapter = { : {}), }, cfg, - { mediaLocalRoots, sessionKey }, + { mediaLocalRoots, sessionKey, inboundTurnKind }, ); }, }; diff --git a/extensions/telegram/src/inbound-turn-delivery.test.ts b/extensions/telegram/src/inbound-turn-delivery.test.ts index ec2c334dff7..b73a08afa17 100644 --- a/extensions/telegram/src/inbound-turn-delivery.test.ts +++ b/extensions/telegram/src/inbound-turn-delivery.test.ts @@ -45,4 +45,43 @@ describe("telegram inbound turn delivery", () => { expect(count).toBe(0); end(); }); + + it("keeps user-request and room-event delivery correlations separate", () => { + let userRequestCount = 0; + let roomEventCount = 0; + const endUserRequest = beginTelegramInboundTurnDeliveryCorrelation("sess:x", { + outboundTo: "999", + markInboundTurnDelivered: () => { + userRequestCount += 1; + }, + }); + const endRoomEvent = beginTelegramInboundTurnDeliveryCorrelation( + "sess:x", + { + outboundTo: "999", + markInboundTurnDelivered: () => { + roomEventCount += 1; + }, + }, + { inboundTurnKind: "room_event" }, + ); + + notifyTelegramInboundTurnOutboundSuccess({ + sessionKey: "sess:x", + to: "999", + inboundTurnKind: "room_event", + }); + expect(roomEventCount).toBe(1); + expect(userRequestCount).toBe(0); + + notifyTelegramInboundTurnOutboundSuccess({ + sessionKey: "sess:x", + to: "999", + }); + expect(roomEventCount).toBe(1); + expect(userRequestCount).toBe(1); + + endRoomEvent(); + endUserRequest(); + }); }); diff --git a/extensions/telegram/src/inbound-turn-delivery.ts b/extensions/telegram/src/inbound-turn-delivery.ts index 7bdb2f4a0b3..78cae4ba287 100644 --- a/extensions/telegram/src/inbound-turn-delivery.ts +++ b/extensions/telegram/src/inbound-turn-delivery.ts @@ -1,4 +1,5 @@ export type TelegramInboundTurnDeliveryEnd = () => void; +export type TelegramInboundTurnDeliveryKind = "user_request" | "room_event"; type ActiveTurn = { outboundTo: string; @@ -8,11 +9,26 @@ type ActiveTurn = { const registry = new Map(); +export function resolveTelegramInboundTurnDeliveryCorrelationKey( + sessionKey: string | undefined, + inboundTurnKind?: TelegramInboundTurnDeliveryKind | string, +): string | undefined { + const key = sessionKey?.trim(); + if (!key) { + return undefined; + } + return inboundTurnKind === "room_event" ? `${key}:room_event` : key; +} + export function beginTelegramInboundTurnDeliveryCorrelation( sessionKey: string | undefined, turn: ActiveTurn, + options?: { inboundTurnKind?: TelegramInboundTurnDeliveryKind | string }, ): TelegramInboundTurnDeliveryEnd { - const key = sessionKey?.trim(); + const key = resolveTelegramInboundTurnDeliveryCorrelationKey( + sessionKey, + options?.inboundTurnKind, + ); if (!key) { return () => {}; } @@ -26,8 +42,12 @@ export function notifyTelegramInboundTurnOutboundSuccess(params: { sessionKey: string | undefined; to: string; accountId?: string | null; + inboundTurnKind?: TelegramInboundTurnDeliveryKind | string; }): void { - const key = params.sessionKey?.trim(); + const key = resolveTelegramInboundTurnDeliveryCorrelationKey( + params.sessionKey, + params.inboundTurnKind, + ); if (!key) { return; } diff --git a/src/agents/openclaw-tools.ts b/src/agents/openclaw-tools.ts index 8cf06c88029..2a25aa8ac13 100644 --- a/src/agents/openclaw-tools.ts +++ b/src/agents/openclaw-tools.ts @@ -1,4 +1,5 @@ import type { SourceReplyDeliveryMode } from "../auto-reply/get-reply-options.types.js"; +import type { InboundTurnKind } from "../channels/turn/kind.js"; import { selectApplicableRuntimeConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { callGateway } from "../gateway/call.js"; @@ -120,6 +121,7 @@ export function createOpenClawTools( requireExplicitMessageTarget?: boolean; /** Visible source replies must be sent through the message tool when set to message_tool_only. */ sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + inboundTurnKind?: InboundTurnKind; /** If true, omit the message tool from the tool list. */ disableMessageTool?: boolean; /** If true, include the heartbeat response tool for structured heartbeat outcomes. */ @@ -299,6 +301,7 @@ export function createOpenClawTools( sandboxRoot: options?.sandboxRoot, requireExplicitTarget: options?.requireExplicitMessageTarget, sourceReplyDeliveryMode: options?.sourceReplyDeliveryMode, + inboundTurnKind: options?.inboundTurnKind, requesterSenderId: options?.requesterSenderId ?? undefined, senderIsOwner: options?.senderIsOwner, }); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 445ea8be9c4..bc38b9315d0 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1111,6 +1111,7 @@ export async function runEmbeddedAttempt( requireExplicitMessageTarget: params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey), sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, + inboundTurnKind: params.currentTurnKind, disableMessageTool: params.disableMessageTool, forceMessageTool: params.forceMessageTool, enableHeartbeatTool: params.enableHeartbeatTool, diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index 23c6b5f34d6..88d252a836d 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -1,6 +1,7 @@ import { createCodingTools, createReadTool } from "@earendil-works/pi-coding-agent"; import type { SourceReplyDeliveryMode } from "../auto-reply/get-reply-options.types.js"; import { HEARTBEAT_RESPONSE_TOOL_NAME } from "../auto-reply/heartbeat-tool-response.js"; +import type { InboundTurnKind } from "../channels/turn/kind.js"; import { resolveExecCommandHighlighting } from "../config/exec-command-highlighting.js"; import type { ModelCompatConfig } from "../config/types.models.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; @@ -409,6 +410,7 @@ export function createOpenClawCodingTools(options?: { requireExplicitMessageTarget?: boolean; /** Visible source replies must be sent through the message tool when set to message_tool_only. */ sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + inboundTurnKind?: InboundTurnKind; /** If true, omit the message tool from the tool list. */ disableMessageTool?: boolean; /** Keep the message tool available even when the selected profile omits it. */ @@ -908,6 +910,7 @@ export function createOpenClawCodingTools(options?: { modelHasVision: options?.modelHasVision, requireExplicitMessageTarget: options?.requireExplicitMessageTarget, sourceReplyDeliveryMode: options?.sourceReplyDeliveryMode, + inboundTurnKind: options?.inboundTurnKind, disableMessageTool: options?.disableMessageTool, enableHeartbeatTool, disablePluginTools: !includePluginTools, diff --git a/src/agents/tools/message-tool.ts b/src/agents/tools/message-tool.ts index 661770339b2..c4555eee51d 100644 --- a/src/agents/tools/message-tool.ts +++ b/src/agents/tools/message-tool.ts @@ -11,6 +11,7 @@ import { import { CHANNEL_MESSAGE_ACTION_NAMES } from "../../channels/plugins/message-action-names.js"; import type { ChannelMessageCapability } from "../../channels/plugins/message-capabilities.js"; import type { ChannelMessageActionName } from "../../channels/plugins/types.public.js"; +import type { InboundTurnKind } from "../../channels/turn/kind.js"; import { resolveCommandSecretRefsViaGateway } from "../../cli/command-secret-gateway.js"; import { getScopedChannelsCommandSecretTargets } from "../../cli/command-secret-targets.js"; import { resolveMessageSecretScope } from "../../cli/message-secret-scope.js"; @@ -570,6 +571,7 @@ type MessageToolOptions = { sandboxRoot?: string; requireExplicitTarget?: boolean; sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + inboundTurnKind?: InboundTurnKind; requesterSenderId?: string; senderIsOwner?: boolean; }; @@ -960,6 +962,7 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool { agentId: resolvedAgentId, sandboxRoot: options?.sandboxRoot, sourceReplyDeliveryMode: options?.sourceReplyDeliveryMode, + inboundTurnKind: options?.inboundTurnKind, abortSignal: signal, }); diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 0d6e7e34af6..b54478c37c3 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -9,6 +9,7 @@ import type { MessagePresentation } from "../../interactive/payload.js"; import type { OutboundMediaAccess } from "../../media/load-options.js"; import type { PollInput } from "../../polls.js"; import type { ChatType } from "../chat-type.js"; +import type { InboundTurnKind } from "../turn/kind.js"; import type { ChannelId } from "./channel-id.types.js"; import type { ChannelMessageActionName as ChannelMessageActionNameFromList } from "./message-action-names.js"; import type { ChannelMessageCapability } from "./message-capabilities.js"; @@ -681,6 +682,7 @@ export type ChannelMessageActionContext = { senderIsOwner?: boolean; sessionKey?: string | null; sessionId?: string | null; + inboundTurnKind?: InboundTurnKind; agentId?: string | null; gateway?: { url?: string; diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 8413d94cceb..405d5a79117 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -16,6 +16,7 @@ import type { ChannelMessageActionName, ChannelThreadingToolContext, } from "../../channels/plugins/types.public.js"; +import type { InboundTurnKind } from "../../channels/turn/kind.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { hasInteractiveReplyBlocks, @@ -121,6 +122,7 @@ export type RunMessageActionParams = { sandboxRoot?: string; dryRun?: boolean; sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + inboundTurnKind?: InboundTurnKind; abortSignal?: AbortSignal; }; @@ -556,6 +558,7 @@ async function runGatewayPluginMessageActionOrNull(params: { senderIsOwner: params.input.senderIsOwner, sessionKey: params.input.sessionKey, sessionId: params.input.sessionId, + inboundTurnKind: params.input.inboundTurnKind, agentId: params.agentId, toolContext: params.input.toolContext, idempotencyKey: await resolveGatewayActionIdempotencyKey( @@ -937,6 +940,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise