From d42ef2ac62166eb9fac359e88bca4447de28e82e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 25 Feb 2026 02:15:54 +0000 Subject: [PATCH] refactor: consolidate typing lifecycle and queue policy --- .../matrix/src/matrix/monitor/handler.ts | 4 +- .../mattermost/src/mattermost/monitor.ts | 4 +- extensions/msteams/src/reply-dispatcher.ts | 4 +- src/auto-reply/reply/agent-runner.ts | 12 +- src/auto-reply/reply/queue-policy.test.ts | 48 ++++ src/auto-reply/reply/queue-policy.ts | 21 ++ src/auto-reply/reply/reply-dispatcher.ts | 15 +- src/auto-reply/reply/typing.ts | 23 +- src/channels/channel-helpers.test.ts | 237 ------------------ src/channels/conversation-label.test.ts | 71 ++++++ src/channels/registry.helpers.test.ts | 42 ++++ src/channels/targets.test.ts | 39 +++ src/channels/typing-lifecycle.ts | 55 ++++ src/channels/typing.test.ts | 88 +++++++ src/channels/typing.ts | 33 +-- .../monitor/message-handler.process.ts | 3 +- src/signal/monitor/event-handler.ts | 4 +- src/slack/monitor/message-handler/dispatch.ts | 4 +- src/telegram/bot-message-dispatch.ts | 4 +- 19 files changed, 410 insertions(+), 301 deletions(-) create mode 100644 src/auto-reply/reply/queue-policy.test.ts create mode 100644 src/auto-reply/reply/queue-policy.ts delete mode 100644 src/channels/channel-helpers.test.ts create mode 100644 src/channels/conversation-label.test.ts create mode 100644 src/channels/registry.helpers.test.ts create mode 100644 src/channels/targets.test.ts create mode 100644 src/channels/typing-lifecycle.ts create mode 100644 src/channels/typing.test.ts diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index f62ef60ce3b..77e88162af3 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -635,6 +635,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), + typingCallbacks, deliver: async (payload) => { await deliverMatrixReplies({ replies: [payload], @@ -652,9 +653,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam onError: (err, info) => { runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`); }, - onReplyStart: typingCallbacks.onReplyStart, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }); const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 233aee17d1b..6056c3fef15 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -768,6 +768,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), + typingCallbacks, deliver: async (payload: ReplyPayload) => { const mediaUrls = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode); @@ -804,9 +805,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} onError: (err, info) => { runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); }, - onReplyStart: typingCallbacks.onReplyStart, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }); await core.channel.reply.dispatchReplyFromConfig({ diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index 755ae3e56e1..36d611c39da 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -68,6 +68,7 @@ export function createMSTeamsReplyDispatcher(params: { core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId), + typingCallbacks, deliver: async (payload) => { const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg: params.cfg, @@ -121,9 +122,6 @@ export function createMSTeamsReplyDispatcher(params: { hint, }); }, - onReplyStart: typingCallbacks.onReplyStart, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }); return { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 49e7408357e..8628fe33a51 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -51,6 +51,7 @@ import { readSessionMessages, } from "./post-compaction-audit.js"; import { readPostCompactionContext } from "./post-compaction-context.js"; +import { resolveActiveRunQueueAction } from "./queue-policy.js"; import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; @@ -235,12 +236,19 @@ export async function runReplyAgent(params: { } } - if (isHeartbeat && isActive) { + const activeRunQueueAction = resolveActiveRunQueueAction({ + isActive, + isHeartbeat, + shouldFollowup, + queueMode: resolvedQueue.mode, + }); + + if (activeRunQueueAction === "drop") { typing.cleanup(); return undefined; } - if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) { + if (activeRunQueueAction === "enqueue-followup") { enqueueFollowupRun(queueKey, followupRun, resolvedQueue); await touchActiveSessionEntry(); typing.cleanup(); diff --git a/src/auto-reply/reply/queue-policy.test.ts b/src/auto-reply/reply/queue-policy.test.ts new file mode 100644 index 00000000000..265cc19ff9b --- /dev/null +++ b/src/auto-reply/reply/queue-policy.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it } from "vitest"; +import { resolveActiveRunQueueAction } from "./queue-policy.js"; + +describe("resolveActiveRunQueueAction", () => { + it("runs immediately when there is no active run", () => { + expect( + resolveActiveRunQueueAction({ + isActive: false, + isHeartbeat: false, + shouldFollowup: true, + queueMode: "collect", + }), + ).toBe("run-now"); + }); + + it("drops heartbeat runs while another run is active", () => { + expect( + resolveActiveRunQueueAction({ + isActive: true, + isHeartbeat: true, + shouldFollowup: true, + queueMode: "collect", + }), + ).toBe("drop"); + }); + + it("enqueues followups for non-heartbeat active runs", () => { + expect( + resolveActiveRunQueueAction({ + isActive: true, + isHeartbeat: false, + shouldFollowup: true, + queueMode: "collect", + }), + ).toBe("enqueue-followup"); + }); + + it("enqueues steer mode runs while active", () => { + expect( + resolveActiveRunQueueAction({ + isActive: true, + isHeartbeat: false, + shouldFollowup: false, + queueMode: "steer", + }), + ).toBe("enqueue-followup"); + }); +}); diff --git a/src/auto-reply/reply/queue-policy.ts b/src/auto-reply/reply/queue-policy.ts new file mode 100644 index 00000000000..73fc48bdcc6 --- /dev/null +++ b/src/auto-reply/reply/queue-policy.ts @@ -0,0 +1,21 @@ +import type { QueueSettings } from "./queue.js"; + +export type ActiveRunQueueAction = "run-now" | "enqueue-followup" | "drop"; + +export function resolveActiveRunQueueAction(params: { + isActive: boolean; + isHeartbeat: boolean; + shouldFollowup: boolean; + queueMode: QueueSettings["mode"]; +}): ActiveRunQueueAction { + if (!params.isActive) { + return "run-now"; + } + if (params.isHeartbeat) { + return "drop"; + } + if (params.shouldFollowup || params.queueMode === "steer") { + return "enqueue-followup"; + } + return "run-now"; +} diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index bfc5fa20f0f..5c015dcd557 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -1,3 +1,4 @@ +import type { TypingCallbacks } from "../../channels/typing.js"; import type { HumanDelayConfig } from "../../config/types.js"; import { sleep } from "../../utils.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; @@ -57,6 +58,7 @@ export type ReplyDispatcherOptions = { }; export type ReplyDispatcherWithTypingOptions = Omit & { + typingCallbacks?: TypingCallbacks; onReplyStart?: () => Promise | void; onIdle?: () => void; /** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */ @@ -209,28 +211,31 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis export function createReplyDispatcherWithTyping( options: ReplyDispatcherWithTypingOptions, ): ReplyDispatcherWithTypingResult { - const { onReplyStart, onIdle, onCleanup, ...dispatcherOptions } = options; + const { typingCallbacks, onReplyStart, onIdle, onCleanup, ...dispatcherOptions } = options; + const resolvedOnReplyStart = onReplyStart ?? typingCallbacks?.onReplyStart; + const resolvedOnIdle = onIdle ?? typingCallbacks?.onIdle; + const resolvedOnCleanup = onCleanup ?? typingCallbacks?.onCleanup; let typingController: TypingController | undefined; const dispatcher = createReplyDispatcher({ ...dispatcherOptions, onIdle: () => { typingController?.markDispatchIdle(); - onIdle?.(); + resolvedOnIdle?.(); }, }); return { dispatcher, replyOptions: { - onReplyStart, - onTypingCleanup: onCleanup, + onReplyStart: resolvedOnReplyStart, + onTypingCleanup: resolvedOnCleanup, onTypingController: (typing) => { typingController = typing; }, }, markDispatchIdle: () => { typingController?.markDispatchIdle(); - onIdle?.(); + resolvedOnIdle?.(); }, }; } diff --git a/src/auto-reply/reply/typing.ts b/src/auto-reply/reply/typing.ts index ececcc2fb84..fee32418050 100644 --- a/src/auto-reply/reply/typing.ts +++ b/src/auto-reply/reply/typing.ts @@ -1,3 +1,4 @@ +import { createTypingKeepaliveLoop } from "../../channels/typing-lifecycle.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; export type TypingController = { @@ -35,7 +36,6 @@ export function createTypingController(params: { // especially when upstream event emitters don't await async listeners. // Once we stop typing, we "seal" the controller so late events can't restart typing forever. let sealed = false; - let typingTimer: NodeJS.Timeout | undefined; let typingTtlTimer: NodeJS.Timeout | undefined; const typingIntervalMs = typingIntervalSeconds * 1000; @@ -61,10 +61,7 @@ export function createTypingController(params: { clearTimeout(typingTtlTimer); typingTtlTimer = undefined; } - if (typingTimer) { - clearInterval(typingTimer); - typingTimer = undefined; - } + typingLoop.stop(); // Notify the channel to stop its typing indicator (e.g., on NO_REPLY). // This fires only once (sealed prevents re-entry). if (active) { @@ -88,7 +85,7 @@ export function createTypingController(params: { clearTimeout(typingTtlTimer); } typingTtlTimer = setTimeout(() => { - if (!typingTimer) { + if (!typingLoop.isRunning()) { return; } log?.(`typing TTL reached (${formatTypingTtl(typingTtlMs)}); stopping typing indicator`); @@ -105,6 +102,11 @@ export function createTypingController(params: { await onReplyStart?.(); }; + const typingLoop = createTypingKeepaliveLoop({ + intervalMs: typingIntervalMs, + onTick: triggerTyping, + }); + const ensureStart = async () => { if (sealed) { return; @@ -146,16 +148,11 @@ export function createTypingController(params: { if (!onReplyStart) { return; } - if (typingIntervalMs <= 0) { - return; - } - if (typingTimer) { + if (typingLoop.isRunning()) { return; } await ensureStart(); - typingTimer = setInterval(() => { - void triggerTyping(); - }, typingIntervalMs); + typingLoop.start(); }; const startTypingOnText = async (text?: string) => { diff --git a/src/channels/channel-helpers.test.ts b/src/channels/channel-helpers.test.ts deleted file mode 100644 index 34bd5370526..00000000000 --- a/src/channels/channel-helpers.test.ts +++ /dev/null @@ -1,237 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; -import type { MsgContext } from "../auto-reply/templating.js"; -import { resolveConversationLabel } from "./conversation-label.js"; -import { - formatChannelSelectionLine, - listChatChannels, - normalizeChatChannelId, -} from "./registry.js"; -import { buildMessagingTarget, ensureTargetId, requireTargetKind } from "./targets.js"; -import { createTypingCallbacks } from "./typing.js"; - -const flushMicrotasks = async () => { - await Promise.resolve(); - await Promise.resolve(); -}; - -describe("channel registry helpers", () => { - it("normalizes aliases + trims whitespace", () => { - expect(normalizeChatChannelId(" imsg ")).toBe("imessage"); - expect(normalizeChatChannelId("gchat")).toBe("googlechat"); - expect(normalizeChatChannelId("google-chat")).toBe("googlechat"); - expect(normalizeChatChannelId("internet-relay-chat")).toBe("irc"); - expect(normalizeChatChannelId("telegram")).toBe("telegram"); - expect(normalizeChatChannelId("web")).toBeNull(); - expect(normalizeChatChannelId("nope")).toBeNull(); - }); - - it("keeps Telegram first in the default order", () => { - const channels = listChatChannels(); - expect(channels[0]?.id).toBe("telegram"); - }); - - it("does not include MS Teams by default", () => { - const channels = listChatChannels(); - expect(channels.some((channel) => channel.id === "msteams")).toBe(false); - }); - - it("formats selection lines with docs labels + website extras", () => { - const channels = listChatChannels(); - const first = channels[0]; - if (!first) { - throw new Error("Missing channel metadata."); - } - const line = formatChannelSelectionLine(first, (path, label) => - [label, path].filter(Boolean).join(":"), - ); - expect(line).not.toContain("Docs:"); - expect(line).toContain("/channels/telegram"); - expect(line).toContain("https://openclaw.ai"); - }); -}); - -describe("channel targets", () => { - it("ensureTargetId returns the candidate when it matches", () => { - expect( - ensureTargetId({ - candidate: "U123", - pattern: /^[A-Z0-9]+$/i, - errorMessage: "bad", - }), - ).toBe("U123"); - }); - - it("ensureTargetId throws with the provided message on mismatch", () => { - expect(() => - ensureTargetId({ - candidate: "not-ok", - pattern: /^[A-Z0-9]+$/i, - errorMessage: "Bad target", - }), - ).toThrow(/Bad target/); - }); - - it("requireTargetKind returns the target id when the kind matches", () => { - const target = buildMessagingTarget("channel", "C123", "C123"); - expect(requireTargetKind({ platform: "Slack", target, kind: "channel" })).toBe("C123"); - }); - - it("requireTargetKind throws when the kind is missing or mismatched", () => { - expect(() => - requireTargetKind({ platform: "Slack", target: undefined, kind: "channel" }), - ).toThrow(/Slack channel id is required/); - const target = buildMessagingTarget("user", "U123", "U123"); - expect(() => requireTargetKind({ platform: "Slack", target, kind: "channel" })).toThrow( - /Slack channel id is required/, - ); - }); -}); - -describe("resolveConversationLabel", () => { - const cases: Array<{ name: string; ctx: MsgContext; expected: string }> = [ - { - name: "prefers ConversationLabel when present", - ctx: { ConversationLabel: "Pinned Label", ChatType: "group" }, - expected: "Pinned Label", - }, - { - name: "prefers ThreadLabel over derived chat labels", - ctx: { - ThreadLabel: "Thread Alpha", - ChatType: "group", - GroupSubject: "Ops", - From: "telegram:group:42", - }, - expected: "Thread Alpha", - }, - { - name: "uses SenderName for direct chats when available", - ctx: { ChatType: "direct", SenderName: "Ada", From: "telegram:99" }, - expected: "Ada", - }, - { - name: "falls back to From for direct chats when SenderName is missing", - ctx: { ChatType: "direct", From: "telegram:99" }, - expected: "telegram:99", - }, - { - name: "derives Telegram-like group labels with numeric id suffix", - ctx: { ChatType: "group", GroupSubject: "Ops", From: "telegram:group:42" }, - expected: "Ops id:42", - }, - { - name: "does not append ids for #rooms/channels", - ctx: { - ChatType: "channel", - GroupSubject: "#general", - From: "slack:channel:C123", - }, - expected: "#general", - }, - { - name: "does not append ids when the base already contains the id", - ctx: { - ChatType: "group", - GroupSubject: "Family id:123@g.us", - From: "whatsapp:group:123@g.us", - }, - expected: "Family id:123@g.us", - }, - { - name: "appends ids for WhatsApp-like group ids when a subject exists", - ctx: { - ChatType: "group", - GroupSubject: "Family", - From: "whatsapp:group:123@g.us", - }, - expected: "Family id:123@g.us", - }, - ]; - - for (const testCase of cases) { - it(testCase.name, () => { - expect(resolveConversationLabel(testCase.ctx)).toBe(testCase.expected); - }); - } -}); - -describe("createTypingCallbacks", () => { - it("invokes start on reply start", async () => { - const start = vi.fn().mockResolvedValue(undefined); - const onStartError = vi.fn(); - const callbacks = createTypingCallbacks({ start, onStartError }); - - await callbacks.onReplyStart(); - - expect(start).toHaveBeenCalledTimes(1); - expect(onStartError).not.toHaveBeenCalled(); - }); - - it("reports start errors", async () => { - const start = vi.fn().mockRejectedValue(new Error("fail")); - const onStartError = vi.fn(); - const callbacks = createTypingCallbacks({ start, onStartError }); - - await callbacks.onReplyStart(); - - expect(onStartError).toHaveBeenCalledTimes(1); - }); - - it("invokes stop on idle and reports stop errors", async () => { - const start = vi.fn().mockResolvedValue(undefined); - const stop = vi.fn().mockRejectedValue(new Error("stop")); - const onStartError = vi.fn(); - const onStopError = vi.fn(); - const callbacks = createTypingCallbacks({ start, stop, onStartError, onStopError }); - - callbacks.onIdle?.(); - await flushMicrotasks(); - - expect(stop).toHaveBeenCalledTimes(1); - expect(onStopError).toHaveBeenCalledTimes(1); - }); - - it("sends typing keepalive pings until idle cleanup", async () => { - vi.useFakeTimers(); - try { - const start = vi.fn().mockResolvedValue(undefined); - const stop = vi.fn().mockResolvedValue(undefined); - const onStartError = vi.fn(); - const callbacks = createTypingCallbacks({ start, stop, onStartError }); - - await callbacks.onReplyStart(); - expect(start).toHaveBeenCalledTimes(1); - - await vi.advanceTimersByTimeAsync(2_999); - expect(start).toHaveBeenCalledTimes(1); - - await vi.advanceTimersByTimeAsync(1); - expect(start).toHaveBeenCalledTimes(2); - - await vi.advanceTimersByTimeAsync(3_000); - expect(start).toHaveBeenCalledTimes(3); - - callbacks.onIdle?.(); - await flushMicrotasks(); - expect(stop).toHaveBeenCalledTimes(1); - - await vi.advanceTimersByTimeAsync(9_000); - expect(start).toHaveBeenCalledTimes(3); - } finally { - vi.useRealTimers(); - } - }); - - it("deduplicates stop across idle and cleanup", async () => { - const start = vi.fn().mockResolvedValue(undefined); - const stop = vi.fn().mockResolvedValue(undefined); - const onStartError = vi.fn(); - const callbacks = createTypingCallbacks({ start, stop, onStartError }); - - callbacks.onIdle?.(); - callbacks.onCleanup?.(); - await flushMicrotasks(); - - expect(stop).toHaveBeenCalledTimes(1); - }); -}); diff --git a/src/channels/conversation-label.test.ts b/src/channels/conversation-label.test.ts new file mode 100644 index 00000000000..9d9e042ad0c --- /dev/null +++ b/src/channels/conversation-label.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, it } from "vitest"; +import type { MsgContext } from "../auto-reply/templating.js"; +import { resolveConversationLabel } from "./conversation-label.js"; + +describe("resolveConversationLabel", () => { + const cases: Array<{ name: string; ctx: MsgContext; expected: string }> = [ + { + name: "prefers ConversationLabel when present", + ctx: { ConversationLabel: "Pinned Label", ChatType: "group" }, + expected: "Pinned Label", + }, + { + name: "prefers ThreadLabel over derived chat labels", + ctx: { + ThreadLabel: "Thread Alpha", + ChatType: "group", + GroupSubject: "Ops", + From: "telegram:group:42", + }, + expected: "Thread Alpha", + }, + { + name: "uses SenderName for direct chats when available", + ctx: { ChatType: "direct", SenderName: "Ada", From: "telegram:99" }, + expected: "Ada", + }, + { + name: "falls back to From for direct chats when SenderName is missing", + ctx: { ChatType: "direct", From: "telegram:99" }, + expected: "telegram:99", + }, + { + name: "derives Telegram-like group labels with numeric id suffix", + ctx: { ChatType: "group", GroupSubject: "Ops", From: "telegram:group:42" }, + expected: "Ops id:42", + }, + { + name: "does not append ids for #rooms/channels", + ctx: { + ChatType: "channel", + GroupSubject: "#general", + From: "slack:channel:C123", + }, + expected: "#general", + }, + { + name: "does not append ids when the base already contains the id", + ctx: { + ChatType: "group", + GroupSubject: "Family id:123@g.us", + From: "whatsapp:group:123@g.us", + }, + expected: "Family id:123@g.us", + }, + { + name: "appends ids for WhatsApp-like group ids when a subject exists", + ctx: { + ChatType: "group", + GroupSubject: "Family", + From: "whatsapp:group:123@g.us", + }, + expected: "Family id:123@g.us", + }, + ]; + + for (const testCase of cases) { + it(testCase.name, () => { + expect(resolveConversationLabel(testCase.ctx)).toBe(testCase.expected); + }); + } +}); diff --git a/src/channels/registry.helpers.test.ts b/src/channels/registry.helpers.test.ts new file mode 100644 index 00000000000..3051f33b4fa --- /dev/null +++ b/src/channels/registry.helpers.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it } from "vitest"; +import { + formatChannelSelectionLine, + listChatChannels, + normalizeChatChannelId, +} from "./registry.js"; + +describe("channel registry helpers", () => { + it("normalizes aliases + trims whitespace", () => { + expect(normalizeChatChannelId(" imsg ")).toBe("imessage"); + expect(normalizeChatChannelId("gchat")).toBe("googlechat"); + expect(normalizeChatChannelId("google-chat")).toBe("googlechat"); + expect(normalizeChatChannelId("internet-relay-chat")).toBe("irc"); + expect(normalizeChatChannelId("telegram")).toBe("telegram"); + expect(normalizeChatChannelId("web")).toBeNull(); + expect(normalizeChatChannelId("nope")).toBeNull(); + }); + + it("keeps Telegram first in the default order", () => { + const channels = listChatChannels(); + expect(channels[0]?.id).toBe("telegram"); + }); + + it("does not include MS Teams by default", () => { + const channels = listChatChannels(); + expect(channels.some((channel) => channel.id === "msteams")).toBe(false); + }); + + it("formats selection lines with docs labels + website extras", () => { + const channels = listChatChannels(); + const first = channels[0]; + if (!first) { + throw new Error("Missing channel metadata."); + } + const line = formatChannelSelectionLine(first, (path, label) => + [label, path].filter(Boolean).join(":"), + ); + expect(line).not.toContain("Docs:"); + expect(line).toContain("/channels/telegram"); + expect(line).toContain("https://openclaw.ai"); + }); +}); diff --git a/src/channels/targets.test.ts b/src/channels/targets.test.ts new file mode 100644 index 00000000000..cea39999836 --- /dev/null +++ b/src/channels/targets.test.ts @@ -0,0 +1,39 @@ +import { describe, expect, it } from "vitest"; +import { buildMessagingTarget, ensureTargetId, requireTargetKind } from "./targets.js"; + +describe("channel targets", () => { + it("ensureTargetId returns the candidate when it matches", () => { + expect( + ensureTargetId({ + candidate: "U123", + pattern: /^[A-Z0-9]+$/i, + errorMessage: "bad", + }), + ).toBe("U123"); + }); + + it("ensureTargetId throws with the provided message on mismatch", () => { + expect(() => + ensureTargetId({ + candidate: "not-ok", + pattern: /^[A-Z0-9]+$/i, + errorMessage: "Bad target", + }), + ).toThrow(/Bad target/); + }); + + it("requireTargetKind returns the target id when the kind matches", () => { + const target = buildMessagingTarget("channel", "C123", "C123"); + expect(requireTargetKind({ platform: "Slack", target, kind: "channel" })).toBe("C123"); + }); + + it("requireTargetKind throws when the kind is missing or mismatched", () => { + expect(() => + requireTargetKind({ platform: "Slack", target: undefined, kind: "channel" }), + ).toThrow(/Slack channel id is required/); + const target = buildMessagingTarget("user", "U123", "U123"); + expect(() => requireTargetKind({ platform: "Slack", target, kind: "channel" })).toThrow( + /Slack channel id is required/, + ); + }); +}); diff --git a/src/channels/typing-lifecycle.ts b/src/channels/typing-lifecycle.ts new file mode 100644 index 00000000000..68cab9113ae --- /dev/null +++ b/src/channels/typing-lifecycle.ts @@ -0,0 +1,55 @@ +type AsyncTick = () => Promise | void; + +export type TypingKeepaliveLoop = { + tick: () => Promise; + start: () => void; + stop: () => void; + isRunning: () => boolean; +}; + +export function createTypingKeepaliveLoop(params: { + intervalMs: number; + onTick: AsyncTick; +}): TypingKeepaliveLoop { + let timer: ReturnType | undefined; + let tickInFlight = false; + + const tick = async () => { + if (tickInFlight) { + return; + } + tickInFlight = true; + try { + await params.onTick(); + } finally { + tickInFlight = false; + } + }; + + const start = () => { + if (params.intervalMs <= 0 || timer) { + return; + } + timer = setInterval(() => { + void tick(); + }, params.intervalMs); + }; + + const stop = () => { + if (!timer) { + return; + } + clearInterval(timer); + timer = undefined; + tickInFlight = false; + }; + + const isRunning = () => timer !== undefined; + + return { + tick, + start, + stop, + isRunning, + }; +} diff --git a/src/channels/typing.test.ts b/src/channels/typing.test.ts new file mode 100644 index 00000000000..c1f314183b8 --- /dev/null +++ b/src/channels/typing.test.ts @@ -0,0 +1,88 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTypingCallbacks } from "./typing.js"; + +const flushMicrotasks = async () => { + await Promise.resolve(); + await Promise.resolve(); +}; + +describe("createTypingCallbacks", () => { + it("invokes start on reply start", async () => { + const start = vi.fn().mockResolvedValue(undefined); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, onStartError }); + + await callbacks.onReplyStart(); + + expect(start).toHaveBeenCalledTimes(1); + expect(onStartError).not.toHaveBeenCalled(); + }); + + it("reports start errors", async () => { + const start = vi.fn().mockRejectedValue(new Error("fail")); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, onStartError }); + + await callbacks.onReplyStart(); + + expect(onStartError).toHaveBeenCalledTimes(1); + }); + + it("invokes stop on idle and reports stop errors", async () => { + const start = vi.fn().mockResolvedValue(undefined); + const stop = vi.fn().mockRejectedValue(new Error("stop")); + const onStartError = vi.fn(); + const onStopError = vi.fn(); + const callbacks = createTypingCallbacks({ start, stop, onStartError, onStopError }); + + callbacks.onIdle?.(); + await flushMicrotasks(); + + expect(stop).toHaveBeenCalledTimes(1); + expect(onStopError).toHaveBeenCalledTimes(1); + }); + + it("sends typing keepalive pings until idle cleanup", async () => { + vi.useFakeTimers(); + try { + const start = vi.fn().mockResolvedValue(undefined); + const stop = vi.fn().mockResolvedValue(undefined); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, stop, onStartError }); + + await callbacks.onReplyStart(); + expect(start).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(2_999); + expect(start).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1); + expect(start).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(3_000); + expect(start).toHaveBeenCalledTimes(3); + + callbacks.onIdle?.(); + await flushMicrotasks(); + expect(stop).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(9_000); + expect(start).toHaveBeenCalledTimes(3); + } finally { + vi.useRealTimers(); + } + }); + + it("deduplicates stop across idle and cleanup", async () => { + const start = vi.fn().mockResolvedValue(undefined); + const stop = vi.fn().mockResolvedValue(undefined); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, stop, onStartError }); + + callbacks.onIdle?.(); + callbacks.onCleanup?.(); + await flushMicrotasks(); + + expect(stop).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/channels/typing.ts b/src/channels/typing.ts index f6d60d498d1..b701dfb72cd 100644 --- a/src/channels/typing.ts +++ b/src/channels/typing.ts @@ -1,3 +1,5 @@ +import { createTypingKeepaliveLoop } from "./typing-lifecycle.js"; + export type TypingCallbacks = { onReplyStart: () => Promise; onIdle?: () => void; @@ -14,8 +16,6 @@ export function createTypingCallbacks(params: { }): TypingCallbacks { const stop = params.stop; const keepaliveIntervalMs = params.keepaliveIntervalMs ?? 3_000; - let keepaliveTimer: ReturnType | undefined; - let keepaliveStartInFlight = false; let stopSent = false; const fireStart = async () => { @@ -26,35 +26,20 @@ export function createTypingCallbacks(params: { } }; - const clearKeepalive = () => { - if (!keepaliveTimer) { - return; - } - clearInterval(keepaliveTimer); - keepaliveTimer = undefined; - keepaliveStartInFlight = false; - }; + const keepaliveLoop = createTypingKeepaliveLoop({ + intervalMs: keepaliveIntervalMs, + onTick: fireStart, + }); const onReplyStart = async () => { stopSent = false; - clearKeepalive(); + keepaliveLoop.stop(); await fireStart(); - if (keepaliveIntervalMs <= 0) { - return; - } - keepaliveTimer = setInterval(() => { - if (keepaliveStartInFlight) { - return; - } - keepaliveStartInFlight = true; - void fireStart().finally(() => { - keepaliveStartInFlight = false; - }); - }, keepaliveIntervalMs); + keepaliveLoop.start(); }; const fireStop = () => { - clearKeepalive(); + keepaliveLoop.stop(); if (!stop || stopSent) { return; } diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 1d84cd01410..00124709c74 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -569,6 +569,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), + typingCallbacks, deliver: async (payload: ReplyPayload, info) => { const isFinal = info.kind === "final"; if (payload.isReasoning) { @@ -669,8 +670,6 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) await typingCallbacks.onReplyStart(); await statusReactions.setThinking(); }, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }); let dispatchResult: Awaited> | null = null; diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index 4133930389a..b095626ab46 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -222,6 +222,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId), + typingCallbacks, deliver: async (payload) => { await deps.deliverReplies({ replies: [payload], @@ -237,9 +238,6 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { onError: (err, info) => { deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart: typingCallbacks.onReplyStart, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }); const { queuedFinal } = await dispatchInboundMessage({ diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index f6d4b531f61..35db7c2f70e 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -243,6 +243,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), + typingCallbacks, deliver: async (payload) => { if (useStreaming) { await deliverWithStreaming(payload); @@ -304,9 +305,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); typingCallbacks.onIdle?.(); }, - onReplyStart: typingCallbacks.onReplyStart, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }); const draftStream = createSlackDraftStream({ diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 89cb59038db..f45b79fb9ab 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -436,6 +436,7 @@ export const dispatchTelegramMessage = async ({ cfg, dispatcherOptions: { ...prefixOptions, + typingCallbacks, deliver: async (payload, info) => { const previewButtons = ( payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined @@ -540,9 +541,6 @@ export const dispatchTelegramMessage = async ({ deliveryState.markNonSilentFailure(); runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart: typingCallbacks.onReplyStart, - onIdle: typingCallbacks.onIdle, - onCleanup: typingCallbacks.onCleanup, }, replyOptions: { skillFilter,