fix: isolate telegram room event delivery correlation

This commit is contained in:
Peter Steinberger
2026-05-15 15:36:00 +01:00
parent 8c8241140e
commit b5fde36c41
14 changed files with 238 additions and 10 deletions

View File

@@ -408,6 +408,42 @@ describe("handleTelegramAction", () => {
end();
});
it("marks room-event delivery correlations separately", async () => {
let roomEventCount = 0;
let userRequestCount = 0;
const endRoomEvent = beginTelegramInboundTurnDeliveryCorrelation(
"telegram-session",
{
outboundTo: "@testchannel",
markInboundTurnDelivered: () => {
roomEventCount += 1;
},
},
{ inboundTurnKind: "room_event" },
);
const endUserRequest = beginTelegramInboundTurnDeliveryCorrelation("telegram-session", {
outboundTo: "@testchannel",
markInboundTurnDelivered: () => {
userRequestCount += 1;
},
});
await handleTelegramAction(
{
action: "sendMessage",
to: "@testchannel",
content: "Hello from a room event",
},
telegramConfig(),
{ sessionKey: "telegram-session", inboundTurnKind: "room_event" },
);
expect(roomEventCount).toBe(1);
expect(userRequestCount).toBe(0);
endRoomEvent();
endUserRequest();
});
it("accepts shared send action aliases", async () => {
await handleTelegramAction(
{

View File

@@ -19,7 +19,10 @@ import {
import type { MessagePresentation } from "openclaw/plugin-sdk/interactive-runtime";
import { createTelegramActionGate, resolveTelegramPollActionGateState } from "./accounts.js";
import { resolveTelegramInlineButtons } from "./button-types.js";
import { notifyTelegramInboundTurnOutboundSuccess } from "./inbound-turn-delivery.js";
import {
notifyTelegramInboundTurnOutboundSuccess,
type TelegramInboundTurnDeliveryKind,
} from "./inbound-turn-delivery.js";
import {
resolveTelegramInlineButtonsScope,
resolveTelegramTargetChatType,
@@ -230,6 +233,7 @@ export async function handleTelegramAction(
mediaLocalRoots?: readonly string[];
mediaReadFile?: (filePath: string) => Promise<Buffer>;
sessionKey?: string | null;
inboundTurnKind?: TelegramInboundTurnDeliveryKind | string;
},
): Promise<AgentToolResult<unknown>> {
const { action, accountId } = {
@@ -400,6 +404,7 @@ export async function handleTelegramAction(
sessionKey: options?.sessionKey ?? undefined,
to,
accountId,
inboundTurnKind: options?.inboundTurnKind,
});
await maybePinTelegramActionSend({
args: params,

View File

@@ -1724,6 +1724,91 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliveredTexts).toContain("visible request answer");
});
it("lets user requests supersede active room-event dispatch", async () => {
const historyKey = "telegram:group:-100123";
const groupHistories = new Map([[historyKey, []]]);
let roomEventStarted: (() => void) | undefined;
const roomEventStartGate = new Promise<void>((resolve) => {
roomEventStarted = resolve;
});
let releaseRoomEvent: (() => void) | undefined;
const roomEventGate = new Promise<void>((resolve) => {
releaseRoomEvent = resolve;
});
let userRequestStarted: (() => void) | undefined;
const userRequestStartGate = new Promise<void>((resolve) => {
userRequestStarted = resolve;
});
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions }) => {
roomEventStarted?.();
await roomEventGate;
await dispatcherOptions.deliver({ text: "stale ambient answer" }, { kind: "final" });
return {
queuedFinal: true,
counts: { block: 0, final: 1, tool: 0 },
sourceReplyDeliveryMode: "message_tool_only",
};
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
userRequestStarted?.();
await dispatcherOptions.deliver({ text: "fresh request answer" }, { kind: "final" });
return {
queuedFinal: true,
counts: { block: 0, final: 1, tool: 0 },
};
});
const createGroupContext = (
kind: "user_request" | "room_event",
messageId: number,
body: string,
) =>
createContext({
ctxPayload: {
InboundTurnKind: kind,
SessionKey: "agent:main:telegram:group:-100123",
ChatType: "group",
MessageSid: String(messageId),
RawBody: body,
BodyForAgent: body,
CommandBody: body,
CommandAuthorized: true,
} as unknown as TelegramMessageContext["ctxPayload"],
msg: {
chat: { id: -100123, type: "supergroup" },
message_id: messageId,
} as unknown as TelegramMessageContext["msg"],
chatId: -100123,
isGroup: true,
historyKey,
historyLimit: 10,
groupHistories,
threadSpec: { id: undefined, scope: "none" },
});
const roomEventPromise = dispatchWithContext({
context: createGroupContext("room_event", 99, "ambient chatter"),
streamMode: "off",
});
await roomEventStartGate;
const userRequestPromise = dispatchWithContext({
context: createGroupContext("user_request", 100, "@bot answer now"),
streamMode: "off",
});
await userRequestStartGate;
releaseRoomEvent?.();
await Promise.all([roomEventPromise, userRequestPromise]);
const deliveredTexts = deliverReplies.mock.calls.flatMap((call) =>
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
(reply) => reply.text,
),
);
expect(deliveredTexts).toContain("fresh request answer");
expect(deliveredTexts).not.toContain("stale ambient answer");
});
it("does not send visible error fallbacks for room events", async () => {
const historyKey = "telegram:group:-100123";
const groupHistories = new Map([

View File

@@ -168,6 +168,11 @@ type TelegramReplyFenceState = {
activeDispatches: number;
};
type TelegramReplyFenceKey = {
activeKey: string;
roomEventKey: string;
};
// Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work.
const telegramReplyFenceByKey = new Map<string, TelegramReplyFenceState>();
@@ -183,12 +188,16 @@ function resolveTelegramReplyFenceKey(params: {
ctxPayload: { SessionKey?: string; CommandTargetSessionKey?: string; InboundTurnKind?: string };
chatId: number | string;
threadSpec: { id?: number | string | null; scope?: string };
}): string {
}): TelegramReplyFenceKey {
const baseKey =
normalizeTelegramFenceKey(params.ctxPayload.CommandTargetSessionKey) ??
normalizeTelegramFenceKey(params.ctxPayload.SessionKey) ??
`telegram:${String(params.chatId)}:${params.threadSpec.scope ?? "default"}:${params.threadSpec.id ?? "root"}`;
return params.ctxPayload.InboundTurnKind === "room_event" ? `${baseKey}:room_event` : baseKey;
const roomEventKey = `${baseKey}:room_event`;
return {
activeKey: params.ctxPayload.InboundTurnKind === "room_event" ? roomEventKey : baseKey,
roomEventKey,
};
}
function beginTelegramReplyFence(params: { key: string; supersede: boolean }): number {
@@ -205,6 +214,15 @@ function beginTelegramReplyFence(params: { key: string; supersede: boolean }): n
return state.generation;
}
function supersedeTelegramReplyFence(key: string): void {
const state = telegramReplyFenceByKey.get(key);
if (!state) {
return;
}
state.generation += 1;
telegramReplyFenceByKey.set(key, state);
}
function isTelegramReplyFenceSuperseded(params: { key: string; generation: number }): boolean {
return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation;
}
@@ -459,14 +477,14 @@ export const dispatchTelegramMessage = async ({
const isDispatchSuperseded = () =>
replyFenceGeneration !== undefined &&
isTelegramReplyFenceSuperseded({
key: replyFenceKey,
key: replyFenceKey.activeKey,
generation: replyFenceGeneration,
});
const releaseReplyFence = () => {
if (replyFenceGeneration === undefined) {
return;
}
endTelegramReplyFence(replyFenceKey);
endTelegramReplyFence(replyFenceKey.activeKey);
replyFenceGeneration = undefined;
};
const draftMaxChars = Math.min(textLimit, 4096);
@@ -851,9 +869,13 @@ export const dispatchTelegramMessage = async ({
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
const supersedeReplyFence = shouldSupersedeTelegramReplyFence(ctxPayload);
if (!isRoomEvent && supersedeReplyFence) {
supersedeTelegramReplyFence(replyFenceKey.roomEventKey);
}
replyFenceGeneration = beginTelegramReplyFence({
key: replyFenceKey,
supersede: shouldSupersedeTelegramReplyFence(ctxPayload),
key: replyFenceKey.activeKey,
supersede: supersedeReplyFence,
});
const implicitQuoteReplyTargetId =
@@ -875,6 +897,7 @@ export const dispatchTelegramMessage = async ({
outboundAccountId: route.accountId,
markInboundTurnDelivered: () => deliveryState.markDelivered(),
},
{ inboundTurnKind: ctxPayload.InboundTurnKind },
);
const clearGroupHistory = () => {
if (isGroup && historyKey) {

View File

@@ -184,6 +184,7 @@ export const telegramMessageActions: ChannelMessageActionAdapter = {
accountId,
mediaLocalRoots,
sessionKey,
inboundTurnKind,
toolContext,
}) => {
const telegramAction = resolveTelegramMessageActionName(action);
@@ -202,7 +203,7 @@ export const telegramMessageActions: ChannelMessageActionAdapter = {
: {}),
},
cfg,
{ mediaLocalRoots, sessionKey },
{ mediaLocalRoots, sessionKey, inboundTurnKind },
);
},
};

View File

@@ -45,4 +45,43 @@ describe("telegram inbound turn delivery", () => {
expect(count).toBe(0);
end();
});
it("keeps user-request and room-event delivery correlations separate", () => {
let userRequestCount = 0;
let roomEventCount = 0;
const endUserRequest = beginTelegramInboundTurnDeliveryCorrelation("sess:x", {
outboundTo: "999",
markInboundTurnDelivered: () => {
userRequestCount += 1;
},
});
const endRoomEvent = beginTelegramInboundTurnDeliveryCorrelation(
"sess:x",
{
outboundTo: "999",
markInboundTurnDelivered: () => {
roomEventCount += 1;
},
},
{ inboundTurnKind: "room_event" },
);
notifyTelegramInboundTurnOutboundSuccess({
sessionKey: "sess:x",
to: "999",
inboundTurnKind: "room_event",
});
expect(roomEventCount).toBe(1);
expect(userRequestCount).toBe(0);
notifyTelegramInboundTurnOutboundSuccess({
sessionKey: "sess:x",
to: "999",
});
expect(roomEventCount).toBe(1);
expect(userRequestCount).toBe(1);
endRoomEvent();
endUserRequest();
});
});

View File

@@ -1,4 +1,5 @@
export type TelegramInboundTurnDeliveryEnd = () => void;
export type TelegramInboundTurnDeliveryKind = "user_request" | "room_event";
type ActiveTurn = {
outboundTo: string;
@@ -8,11 +9,26 @@ type ActiveTurn = {
const registry = new Map<string, ActiveTurn>();
export function resolveTelegramInboundTurnDeliveryCorrelationKey(
sessionKey: string | undefined,
inboundTurnKind?: TelegramInboundTurnDeliveryKind | string,
): string | undefined {
const key = sessionKey?.trim();
if (!key) {
return undefined;
}
return inboundTurnKind === "room_event" ? `${key}:room_event` : key;
}
export function beginTelegramInboundTurnDeliveryCorrelation(
sessionKey: string | undefined,
turn: ActiveTurn,
options?: { inboundTurnKind?: TelegramInboundTurnDeliveryKind | string },
): TelegramInboundTurnDeliveryEnd {
const key = sessionKey?.trim();
const key = resolveTelegramInboundTurnDeliveryCorrelationKey(
sessionKey,
options?.inboundTurnKind,
);
if (!key) {
return () => {};
}
@@ -26,8 +42,12 @@ export function notifyTelegramInboundTurnOutboundSuccess(params: {
sessionKey: string | undefined;
to: string;
accountId?: string | null;
inboundTurnKind?: TelegramInboundTurnDeliveryKind | string;
}): void {
const key = params.sessionKey?.trim();
const key = resolveTelegramInboundTurnDeliveryCorrelationKey(
params.sessionKey,
params.inboundTurnKind,
);
if (!key) {
return;
}

View File

@@ -1,4 +1,5 @@
import type { SourceReplyDeliveryMode } from "../auto-reply/get-reply-options.types.js";
import type { InboundTurnKind } from "../channels/turn/kind.js";
import { selectApplicableRuntimeConfig } from "../config/config.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { callGateway } from "../gateway/call.js";
@@ -120,6 +121,7 @@ export function createOpenClawTools(
requireExplicitMessageTarget?: boolean;
/** Visible source replies must be sent through the message tool when set to message_tool_only. */
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
inboundTurnKind?: InboundTurnKind;
/** If true, omit the message tool from the tool list. */
disableMessageTool?: boolean;
/** If true, include the heartbeat response tool for structured heartbeat outcomes. */
@@ -299,6 +301,7 @@ export function createOpenClawTools(
sandboxRoot: options?.sandboxRoot,
requireExplicitTarget: options?.requireExplicitMessageTarget,
sourceReplyDeliveryMode: options?.sourceReplyDeliveryMode,
inboundTurnKind: options?.inboundTurnKind,
requesterSenderId: options?.requesterSenderId ?? undefined,
senderIsOwner: options?.senderIsOwner,
});

View File

@@ -1111,6 +1111,7 @@ export async function runEmbeddedAttempt(
requireExplicitMessageTarget:
params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey),
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
inboundTurnKind: params.currentTurnKind,
disableMessageTool: params.disableMessageTool,
forceMessageTool: params.forceMessageTool,
enableHeartbeatTool: params.enableHeartbeatTool,

View File

@@ -1,6 +1,7 @@
import { createCodingTools, createReadTool } from "@earendil-works/pi-coding-agent";
import type { SourceReplyDeliveryMode } from "../auto-reply/get-reply-options.types.js";
import { HEARTBEAT_RESPONSE_TOOL_NAME } from "../auto-reply/heartbeat-tool-response.js";
import type { InboundTurnKind } from "../channels/turn/kind.js";
import { resolveExecCommandHighlighting } from "../config/exec-command-highlighting.js";
import type { ModelCompatConfig } from "../config/types.models.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
@@ -409,6 +410,7 @@ export function createOpenClawCodingTools(options?: {
requireExplicitMessageTarget?: boolean;
/** Visible source replies must be sent through the message tool when set to message_tool_only. */
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
inboundTurnKind?: InboundTurnKind;
/** If true, omit the message tool from the tool list. */
disableMessageTool?: boolean;
/** Keep the message tool available even when the selected profile omits it. */
@@ -908,6 +910,7 @@ export function createOpenClawCodingTools(options?: {
modelHasVision: options?.modelHasVision,
requireExplicitMessageTarget: options?.requireExplicitMessageTarget,
sourceReplyDeliveryMode: options?.sourceReplyDeliveryMode,
inboundTurnKind: options?.inboundTurnKind,
disableMessageTool: options?.disableMessageTool,
enableHeartbeatTool,
disablePluginTools: !includePluginTools,

View File

@@ -11,6 +11,7 @@ import {
import { CHANNEL_MESSAGE_ACTION_NAMES } from "../../channels/plugins/message-action-names.js";
import type { ChannelMessageCapability } from "../../channels/plugins/message-capabilities.js";
import type { ChannelMessageActionName } from "../../channels/plugins/types.public.js";
import type { InboundTurnKind } from "../../channels/turn/kind.js";
import { resolveCommandSecretRefsViaGateway } from "../../cli/command-secret-gateway.js";
import { getScopedChannelsCommandSecretTargets } from "../../cli/command-secret-targets.js";
import { resolveMessageSecretScope } from "../../cli/message-secret-scope.js";
@@ -570,6 +571,7 @@ type MessageToolOptions = {
sandboxRoot?: string;
requireExplicitTarget?: boolean;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
inboundTurnKind?: InboundTurnKind;
requesterSenderId?: string;
senderIsOwner?: boolean;
};
@@ -960,6 +962,7 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool {
agentId: resolvedAgentId,
sandboxRoot: options?.sandboxRoot,
sourceReplyDeliveryMode: options?.sourceReplyDeliveryMode,
inboundTurnKind: options?.inboundTurnKind,
abortSignal: signal,
});

View File

@@ -9,6 +9,7 @@ import type { MessagePresentation } from "../../interactive/payload.js";
import type { OutboundMediaAccess } from "../../media/load-options.js";
import type { PollInput } from "../../polls.js";
import type { ChatType } from "../chat-type.js";
import type { InboundTurnKind } from "../turn/kind.js";
import type { ChannelId } from "./channel-id.types.js";
import type { ChannelMessageActionName as ChannelMessageActionNameFromList } from "./message-action-names.js";
import type { ChannelMessageCapability } from "./message-capabilities.js";
@@ -681,6 +682,7 @@ export type ChannelMessageActionContext = {
senderIsOwner?: boolean;
sessionKey?: string | null;
sessionId?: string | null;
inboundTurnKind?: InboundTurnKind;
agentId?: string | null;
gateway?: {
url?: string;

View File

@@ -16,6 +16,7 @@ import type {
ChannelMessageActionName,
ChannelThreadingToolContext,
} from "../../channels/plugins/types.public.js";
import type { InboundTurnKind } from "../../channels/turn/kind.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
hasInteractiveReplyBlocks,
@@ -121,6 +122,7 @@ export type RunMessageActionParams = {
sandboxRoot?: string;
dryRun?: boolean;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
inboundTurnKind?: InboundTurnKind;
abortSignal?: AbortSignal;
};
@@ -556,6 +558,7 @@ async function runGatewayPluginMessageActionOrNull(params: {
senderIsOwner: params.input.senderIsOwner,
sessionKey: params.input.sessionKey,
sessionId: params.input.sessionId,
inboundTurnKind: params.input.inboundTurnKind,
agentId: params.agentId,
toolContext: params.input.toolContext,
idempotencyKey: await resolveGatewayActionIdempotencyKey(
@@ -937,6 +940,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
accountId: accountId ?? undefined,
senderIsOwner: input.senderIsOwner,
sessionId: input.sessionId,
inboundTurnKind: input.inboundTurnKind,
gateway,
toolContext: input.toolContext,
deps: input.deps,

View File

@@ -6,6 +6,7 @@ import type {
ChannelMessageActionContext,
ChannelThreadingToolContext,
} from "../../channels/plugins/types.public.js";
import type { InboundTurnKind } from "../../channels/turn/kind.js";
import { appendAssistantMessageToSessionTranscript } from "../../config/sessions.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { OutboundMediaAccess, OutboundMediaReadFile } from "../../media/load-options.js";
@@ -45,6 +46,7 @@ export type OutboundSendContext = {
accountId?: string | null;
senderIsOwner?: boolean;
sessionId?: string;
inboundTurnKind?: InboundTurnKind;
gateway?: OutboundGatewayContext;
toolContext?: ChannelThreadingToolContext;
deps?: OutboundSendDeps;
@@ -159,6 +161,7 @@ async function tryHandleWithPluginAction(params: {
senderIsOwner: params.ctx.senderIsOwner,
sessionKey: params.ctx.sessionKey,
sessionId: params.ctx.sessionId,
inboundTurnKind: params.ctx.inboundTurnKind,
agentId: params.ctx.agentId,
gateway: params.ctx.gateway,
toolContext: params.ctx.toolContext,