diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 482b274e59b..715bd71ad75 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -cf3b7869a6870b51bfab5543a27f5f55a2754c59c268906d33b4da91352ab9bb plugin-sdk-api-baseline.json -6938561c972c419925ac17eb10d5d857502d339603de2cf4ece127827676da5f plugin-sdk-api-baseline.jsonl +10f07ebae3910cfe7639c54fb97ec4011c5f8be8a5444b86d23f075f9a49cc4c plugin-sdk-api-baseline.json +fdc165b2d06f00d195e326c2d28176da5cdeb8f8b05df4ec28466d384d57a07b plugin-sdk-api-baseline.jsonl diff --git a/docs/concepts/typing-indicators.md b/docs/concepts/typing-indicators.md index 13e331c6b61..f41982d2a48 100644 --- a/docs/concepts/typing-indicators.md +++ b/docs/concepts/typing-indicators.md @@ -18,7 +18,8 @@ When `agents.defaults.typingMode` is **unset**, OpenClaw keeps the legacy behavi - **Direct chats**: typing starts immediately once the model loop begins. - **Group chats with a mention**: typing starts immediately. - **Group chats without a mention**: typing starts only when message text begins streaming. -- **Heartbeat runs**: typing is disabled. +- **Heartbeat runs**: typing starts when the heartbeat run begins if the + resolved heartbeat target is a typing-capable chat and typing is not disabled. ## Modes @@ -64,6 +65,11 @@ You can override mode or cadence per session: matched case-insensitively). - `thinking` only fires if the run streams reasoning (`reasoningLevel: "stream"`). If the model doesn’t emit reasoning deltas, typing won’t start. -- Heartbeats never show typing, regardless of mode. +- Heartbeat typing is a liveness signal for the resolved delivery target. It + starts at heartbeat run start instead of following `message` or `thinking` + stream timing. Set `typingMode: "never"` to disable it. +- Heartbeats do not show typing when `target: "none"`, when the target cannot + be resolved, when chat delivery is disabled for the heartbeat, or when the + channel does not support typing. - `typingIntervalSeconds` controls the **refresh cadence**, not the start time. The default is 6 seconds. diff --git a/docs/gateway/heartbeat.md b/docs/gateway/heartbeat.md index a20c4bcf368..a43895a3629 100644 --- a/docs/gateway/heartbeat.md +++ b/docs/gateway/heartbeat.md @@ -262,6 +262,9 @@ Use `accountId` to target a specific account on multi-account channels like Tele outbound message is sent. - If `showOk`, `showAlerts`, and `useIndicator` are all disabled, the run is skipped up front as `reason=alerts-disabled`. - If only alert delivery is disabled, OpenClaw can still run the heartbeat, update due-task timestamps, restore the session idle timestamp, and suppress the outward alert payload. +- If the resolved heartbeat target supports typing, OpenClaw shows typing while + the heartbeat run is active. This uses the same target the heartbeat would + send chat output to, and it is disabled by `typingMode: "never"`. - Heartbeat-only replies do **not** keep the session alive; the last `updatedAt` is restored so idle expiry behaves normally. - Detached [background tasks](/automation/tasks) can enqueue a system event and wake heartbeat when the main session should notice something quickly. That wake does not make the heartbeat run a background task. diff --git a/docs/plugins/sdk-channel-plugins.md b/docs/plugins/sdk-channel-plugins.md index 6f54b685bf0..39c07293500 100644 --- a/docs/plugins/sdk-channel-plugins.md +++ b/docs/plugins/sdk-channel-plugins.md @@ -31,10 +31,17 @@ shared `message` tool in core. Your plugin owns: - **Session grammar** — how provider-specific conversation ids map to base chats, thread ids, and parent fallbacks - **Outbound** — sending text, media, and polls to the platform - **Threading** — how replies are threaded +- **Heartbeat typing** — optional typing/busy signals for heartbeat delivery targets Core owns the shared message tool, prompt wiring, the outer session-key shape, generic `:thread:` bookkeeping, and dispatch. +If your channel supports typing indicators outside inbound replies, expose +`heartbeat.sendTyping(...)` on the channel plugin. Core calls it with the +resolved heartbeat delivery target before the heartbeat model run starts and +uses the shared typing keepalive/cleanup lifecycle. Add `heartbeat.clearTyping(...)` +when the platform needs an explicit stop signal. + If your channel adds message-tool params that carry media sources, expose those param names through `describeMessageTool(...).mediaSourceParams`. Core uses that explicit list for sandbox path normalization and outbound media-access diff --git a/extensions/discord/src/channel.test.ts b/extensions/discord/src/channel.test.ts index 027f6cf424f..fc33a067390 100644 --- a/extensions/discord/src/channel.test.ts +++ b/extensions/discord/src/channel.test.ts @@ -244,6 +244,30 @@ describe("discordPlugin outbound", () => { } }); + it("forwards heartbeat typing through the run config and attached target", async () => { + const sendTypingDiscord = vi.fn(async () => ({ ok: true, channelId: "thread-123" })); + const sendTypingSpy = vi + .spyOn(sendModule, "sendTypingDiscord") + .mockImplementation(sendTypingDiscord); + try { + const cfg = createCfg(); + + await discordPlugin.heartbeat!.sendTyping!({ + cfg, + to: "channel:123", + accountId: "work", + threadId: "thread-123", + }); + + expect(sendTypingDiscord).toHaveBeenCalledWith("thread-123", { + cfg, + accountId: "work", + }); + } finally { + sendTypingSpy.mockRestore(); + } + }); + it("uses direct Discord probe helpers for status probes", async () => { const runtimeProbeDiscord = vi.fn(async () => { throw new Error("runtime Discord probe should not be used"); diff --git a/extensions/discord/src/channel.ts b/extensions/discord/src/channel.ts index 87b00d302d5..d193dca68fe 100644 --- a/extensions/discord/src/channel.ts +++ b/extensions/discord/src/channel.ts @@ -541,6 +541,21 @@ export const discordPlugin: ChannelPlugin maxAgeMs, }).map(toConversationLifecycleBinding), }, + heartbeat: { + sendTyping: async ({ cfg, to, accountId, threadId }) => { + const resolvedTo = resolveDiscordAttachedOutboundTarget({ to, threadId }); + const target = parseDiscordTarget(resolvedTo, { defaultKind: "channel" }); + if (!target || target.kind !== "channel") { + return; + } + await ( + await loadDiscordSendModule() + ).sendTypingDiscord(target.id, { + cfg, + accountId: accountId ?? undefined, + }); + }, + }, status: createComputedAccountStatusAdapter({ defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID, { connected: false, diff --git a/extensions/matrix/src/channel.runtime.ts b/extensions/matrix/src/channel.runtime.ts index e3d8c9d05c5..dba1d8c6ef5 100644 --- a/extensions/matrix/src/channel.runtime.ts +++ b/extensions/matrix/src/channel.runtime.ts @@ -1,7 +1,7 @@ import { listMatrixDirectoryGroupsLive, listMatrixDirectoryPeersLive } from "./directory-live.js"; import { resolveMatrixAuth } from "./matrix/client.js"; import { probeMatrix } from "./matrix/probe.js"; -import { sendMessageMatrix } from "./matrix/send.js"; +import { sendMessageMatrix, sendTypingMatrix } from "./matrix/send.js"; import { matrixOutbound } from "./outbound.js"; import { resolveMatrixTargets } from "./resolve-targets.js"; @@ -13,4 +13,5 @@ export const matrixChannelRuntime = { resolveMatrixAuth, resolveMatrixTargets, sendMessageMatrix, + sendTypingMatrix, }; diff --git a/extensions/matrix/src/channel.ts b/extensions/matrix/src/channel.ts index b6712f50dad..f410a468c25 100644 --- a/extensions/matrix/src/channel.ts +++ b/extensions/matrix/src/channel.ts @@ -521,6 +521,24 @@ export const matrixPlugin: ChannelPlugin = lifecycle: { runStartupMaintenance: runMatrixStartupMaintenance, }, + heartbeat: { + sendTyping: async ({ cfg, to, accountId }) => { + await ( + await loadMatrixChannelRuntime() + ).sendTypingMatrix(to, true, { + cfg: cfg as CoreConfig, + ...(accountId ? { accountId } : {}), + }); + }, + clearTyping: async ({ cfg, to, accountId }) => { + await ( + await loadMatrixChannelRuntime() + ).sendTypingMatrix(to, false, { + cfg: cfg as CoreConfig, + ...(accountId ? { accountId } : {}), + }); + }, + }, }, security: { resolveDmPolicy: resolveMatrixDmPolicy, diff --git a/extensions/matrix/src/matrix/send.test.ts b/extensions/matrix/src/matrix/send.test.ts index 9ff9da0c364..145bc8e8915 100644 --- a/extensions/matrix/src/matrix/send.test.ts +++ b/extensions/matrix/src/matrix/send.test.ts @@ -19,6 +19,7 @@ const loadWebMediaMock = vi.fn().mockResolvedValue({ kind: "image", }); const loadConfigMock = vi.fn(() => ({})); +const withResolvedRuntimeMatrixClientMock = vi.hoisted(() => vi.fn()); const getImageMetadataMock = vi.fn().mockResolvedValue(null); const resizeToJpegMock = vi.fn(); const mediaKindFromMimeMock = vi.fn((_: string | null | undefined) => "image"); @@ -36,6 +37,10 @@ vi.mock("./outbound-media-runtime.js", () => ({ loadOutboundMediaFromUrl: loadOutboundMediaFromUrlMock, })); +vi.mock("./client-bootstrap.js", () => ({ + withResolvedRuntimeMatrixClient: withResolvedRuntimeMatrixClientMock, +})); + const runtimeStub = { config: { loadConfig: () => loadConfigMock(), @@ -138,6 +143,19 @@ function resetMatrixSendRuntimeMocks() { kind: "image", }); loadConfigMock.mockReset().mockReturnValue({}); + withResolvedRuntimeMatrixClientMock + .mockReset() + .mockImplementation( + async ( + opts: { client?: import("./sdk.js").MatrixClient }, + run: (resolved: import("./sdk.js").MatrixClient) => Promise, + ) => { + if (!opts.client) { + throw new Error("test Matrix client is required"); + } + return await run(opts.client); + }, + ); getImageMetadataMock.mockReset().mockResolvedValue(null); resizeToJpegMock.mockReset(); mediaKindFromMimeMock.mockReset().mockReturnValue("image"); @@ -187,8 +205,10 @@ describe("sendMessageMatrix media", () => { mediaUrl: "file:///tmp/photo.png", }); - const uploadArg = uploadContent.mock.calls[0]?.[0] as Buffer | undefined; - expect(uploadArg?.toString()).toBe("encrypted"); + const uploadArg = uploadContent.mock.calls[0]?.[0]; + expect(uploadArg instanceof Uint8Array ? Buffer.from(uploadArg).toString() : undefined).toBe( + "encrypted", + ); const content = sendMessage.mock.calls[0]?.[1] as { url?: string; @@ -955,4 +975,34 @@ describe("sendTypingMatrix", () => { expect(setTyping).toHaveBeenCalledWith("!room:example", true, 30_000); }); + + it("passes account config through when resolving the typing client", async () => { + const cfg = { channels: { matrix: {} } } as unknown as import("../types.js").CoreConfig; + const setTyping = vi.fn().mockResolvedValue(undefined); + const client = { + setTyping, + } as unknown as import("./sdk.js").MatrixClient; + withResolvedRuntimeMatrixClientMock.mockImplementation( + async ( + opts: Record, + run: (resolved: import("./sdk.js").MatrixClient) => Promise, + ) => { + expect(opts).toMatchObject({ + cfg, + accountId: "work", + timeoutMs: 12_345, + readiness: "none", + }); + return await run(client); + }, + ); + + await sendTypingMatrix("room:!room:example", true, { + cfg, + accountId: "work", + timeoutMs: 12_345, + }); + + expect(setTyping).toHaveBeenCalledWith("!room:example", true, 12_345); + }); }); diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts index b2f63a6e5e3..5b223bb1d33 100644 --- a/extensions/matrix/src/matrix/send.ts +++ b/extensions/matrix/src/matrix/send.ts @@ -372,17 +372,26 @@ export async function sendPollMatrix( export async function sendTypingMatrix( roomId: string, typing: boolean, - timeoutMs?: number, + optsOrTimeoutMs?: number | MatrixClientResolveOpts, client?: MatrixClient, ): Promise { + const opts = + typeof optsOrTimeoutMs === "number" + ? { timeoutMs: optsOrTimeoutMs, ...(client ? { client } : {}) } + : { + ...normalizeMatrixClientResolveOpts(optsOrTimeoutMs), + ...(client ? { client } : {}), + }; await withResolvedMatrixControlClient( { - client, - timeoutMs, + client: opts.client, + cfg: opts.cfg, + timeoutMs: opts.timeoutMs, + accountId: opts.accountId, }, async (resolved) => { const resolvedRoom = await resolveMatrixRoomId(resolved, roomId); - const resolvedTimeoutMs = typeof timeoutMs === "number" ? timeoutMs : 30_000; + const resolvedTimeoutMs = typeof opts.timeoutMs === "number" ? opts.timeoutMs : 30_000; await resolved.setTyping(resolvedRoom, typing, resolvedTimeoutMs); }, ); diff --git a/extensions/signal/src/channel.ts b/extensions/signal/src/channel.ts index 2197cfafe84..cfda32cea25 100644 --- a/extensions/signal/src/channel.ts +++ b/extensions/signal/src/channel.ts @@ -279,6 +279,25 @@ export const signalPlugin: ChannelPlugin = hint: "", }, }, + heartbeat: { + sendTyping: async ({ cfg, to, accountId }) => { + await ( + await loadSignalSendRuntime() + ).sendTypingSignal(to, { + cfg, + ...(accountId ? { accountId } : {}), + }); + }, + clearTyping: async ({ cfg, to, accountId }) => { + await ( + await loadSignalSendRuntime() + ).sendTypingSignal(to, { + cfg, + ...(accountId ? { accountId } : {}), + stop: true, + }); + }, + }, status: createComputedAccountStatusAdapter({ defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID), collectStatusIssues: (accounts) => collectStatusIssuesFromLastError("signal", accounts), diff --git a/extensions/signal/src/send.runtime.ts b/extensions/signal/src/send.runtime.ts index 430924a77d3..82258f95fa5 100644 --- a/extensions/signal/src/send.runtime.ts +++ b/extensions/signal/src/send.runtime.ts @@ -1 +1 @@ -export { sendMessageSignal } from "./send.js"; +export { sendMessageSignal, sendTypingSignal } from "./send.js"; diff --git a/extensions/signal/src/send.ts b/extensions/signal/src/send.ts index 7947b4fdd2a..e31c0d1cf81 100644 --- a/extensions/signal/src/send.ts +++ b/extensions/signal/src/send.ts @@ -31,7 +31,10 @@ export type SignalSendResult = { timestamp?: number; }; -export type SignalRpcOpts = Pick; +export type SignalRpcOpts = Pick< + SignalSendOpts, + "cfg" | "baseUrl" | "account" | "accountId" | "timeoutMs" +>; export type SignalReceiptType = "read" | "viewed"; diff --git a/extensions/telegram/src/channel.ts b/extensions/telegram/src/channel.ts index 28002323623..914a5944a29 100644 --- a/extensions/telegram/src/channel.ts +++ b/extensions/telegram/src/channel.ts @@ -734,6 +734,16 @@ export const telegramPlugin = createChatChannelPlugin({ await deleteTelegramUpdateOffset({ accountId }); }, }, + heartbeat: { + sendTyping: async ({ cfg, to, accountId, threadId }) => { + const { sendTypingTelegram } = await loadTelegramSendModule(); + await sendTypingTelegram(to, { + cfg, + ...(accountId ? { accountId } : {}), + messageThreadId: parseTelegramThreadId(threadId), + }); + }, + }, approvalCapability: { ...telegramApprovalCapability, render: { diff --git a/extensions/whatsapp/src/channel.ts b/extensions/whatsapp/src/channel.ts index 9e0af3b535e..6508de908f3 100644 --- a/extensions/whatsapp/src/channel.ts +++ b/extensions/whatsapp/src/channel.ts @@ -34,6 +34,7 @@ import { normalizeWhatsAppTarget, } from "./normalize.js"; import { getWhatsAppRuntime } from "./runtime.js"; +import { sendTypingWhatsApp } from "./send.js"; import { resolveWhatsAppOutboundSessionRoute } from "./session-route.js"; import { whatsappSetupAdapter } from "./setup-core.js"; import { @@ -169,6 +170,12 @@ export const whatsappPlugin: ChannelPlugin = heartbeat: { checkReady: async ({ cfg, accountId, deps }) => await checkWhatsAppHeartbeatReady({ cfg, accountId: accountId ?? undefined, deps }), + sendTyping: async ({ cfg, to, accountId }) => { + await sendTypingWhatsApp(to, { + cfg, + ...(accountId ? { accountId } : {}), + }); + }, resolveRecipients: ({ cfg, opts }) => resolveWhatsAppHeartbeatRecipients(cfg, opts), }, status: createAsyncComputedAccountStatusAdapter({ diff --git a/extensions/whatsapp/src/send.ts b/extensions/whatsapp/src/send.ts index c397c89c397..9e011d33bbc 100644 --- a/extensions/whatsapp/src/send.ts +++ b/extensions/whatsapp/src/send.ts @@ -161,6 +161,21 @@ export async function sendMessageWhatsApp( } } +export async function sendTypingWhatsApp( + to: string, + options: { + cfg?: OpenClawConfig; + accountId?: string; + } = {}, +): Promise { + const cfg = options.cfg ?? loadConfig(); + const { listener: active } = requireOutboundActiveWebListener({ + cfg, + accountId: options.accountId, + }); + await active.sendComposingTo(to); +} + export async function sendReactionWhatsApp( chatJid: string, messageId: string, diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index aa156f8cec7..68b3f85d6fa 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -367,6 +367,20 @@ export type ChannelHeartbeatAdapter = { accountId?: string | null; deps?: ChannelHeartbeatDeps; }) => Promise<{ ok: boolean; reason: string }>; + sendTyping?: (params: { + cfg: OpenClawConfig; + to: string; + accountId?: string | null; + threadId?: string | number | null; + deps?: ChannelHeartbeatDeps; + }) => Promise | void; + clearTyping?: (params: { + cfg: OpenClawConfig; + to: string; + accountId?: string | null; + threadId?: string | number | null; + deps?: ChannelHeartbeatDeps; + }) => Promise | void; resolveRecipients?: (params: { cfg: OpenClawConfig; opts?: { to?: string; all?: boolean; accountId?: string }; diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 9d8edeece6f..2f886163319 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -86,6 +86,7 @@ import { resolveHeartbeatSummaryForAgent, type HeartbeatSummary, } from "./heartbeat-summary.js"; +import { createHeartbeatTypingCallbacks } from "./heartbeat-typing.js"; import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js"; import { areHeartbeatsEnabled, @@ -226,6 +227,21 @@ function resolveHeartbeatAckMaxChars(cfg: OpenClawConfig, heartbeat?: HeartbeatC ); } +function isHeartbeatTypingEnabled(params: { cfg: OpenClawConfig; hasChatDelivery: boolean }) { + if (!params.hasChatDelivery) { + return false; + } + const agentCfg = params.cfg.agents?.defaults; + const typingMode = params.cfg.session?.typingMode ?? agentCfg?.typingMode; + return typingMode !== "never"; +} + +function resolveHeartbeatTypingIntervalSeconds(cfg: OpenClawConfig) { + const agentCfg = cfg.agents?.defaults; + const configured = agentCfg?.typingIntervalSeconds ?? cfg.session?.typingIntervalSeconds; + return typeof configured === "number" && configured > 0 ? configured : undefined; +} + function resolveHeartbeatSession( cfg: OpenClawConfig, agentId?: string, @@ -987,6 +1003,34 @@ export async function runHeartbeatOnce(opts: { const canAttemptHeartbeatOk = Boolean( visibility.showOk && delivery.channel !== "none" && delivery.to, ); + const hasChatDelivery = Boolean( + delivery.channel !== "none" && delivery.to && (visibility.showAlerts || visibility.showOk), + ); + const heartbeatTypingIntervalSeconds = resolveHeartbeatTypingIntervalSeconds(cfg); + const heartbeatChannelPlugin = + delivery.channel !== "none" ? resolveHeartbeatChannelPlugin(delivery.channel) : undefined; + const heartbeatTyping = + delivery.channel !== "none" && + isHeartbeatTypingEnabled({ + cfg, + hasChatDelivery, + }) + ? createHeartbeatTypingCallbacks({ + cfg, + target: { + channel: delivery.channel, + ...(delivery.to !== undefined ? { to: delivery.to } : {}), + ...(delivery.accountId !== undefined ? { accountId: delivery.accountId } : {}), + ...(delivery.threadId !== undefined ? { threadId: delivery.threadId } : {}), + }, + ...(heartbeatChannelPlugin ? { plugin: heartbeatChannelPlugin } : {}), + ...(opts.deps ? { deps: opts.deps } : {}), + ...(heartbeatTypingIntervalSeconds !== undefined + ? { typingIntervalSeconds: heartbeatTypingIntervalSeconds } + : {}), + log, + }) + : undefined; const maybeSendHeartbeatOk = async () => { if (!canAttemptHeartbeatOk || delivery.channel === "none" || !delivery.to) { return false; @@ -1016,6 +1060,7 @@ export async function runHeartbeatOnce(opts: { }; try { + await heartbeatTyping?.onReplyStart(); const heartbeatModelOverride = normalizeOptionalString(heartbeat?.model); const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true; const timeoutOverrideSeconds = @@ -1264,6 +1309,8 @@ export async function runHeartbeatOnce(opts: { }); log.error(`heartbeat failed: ${reason}`, { error: reason }); return { status: "failed", reason }; + } finally { + heartbeatTyping?.onCleanup?.(); } } diff --git a/src/infra/heartbeat-runner.typing.test.ts b/src/infra/heartbeat-runner.typing.test.ts new file mode 100644 index 00000000000..f8284b50578 --- /dev/null +++ b/src/infra/heartbeat-runner.typing.test.ts @@ -0,0 +1,181 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ChannelPlugin } from "../channels/plugins/types.public.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js"; +import { runHeartbeatOnce } from "./heartbeat-runner.js"; +import { seedMainSessionStore, withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js"; + +const TELEGRAM_TARGET = "-1001234567890"; + +function installHeartbeatTypingPlugin(params: { + sendTyping: NonNullable["sendTyping"]>; + clearTyping?: NonNullable["clearTyping"]; +}) { + const plugin: ChannelPlugin = { + ...createOutboundTestPlugin({ + id: "telegram", + label: "Telegram", + docsPath: "/channels/telegram", + outbound: { + deliveryMode: "direct", + sendText: async () => ({ channel: "telegram", messageId: "m1" }), + }, + }), + heartbeat: { + sendTyping: params.sendTyping, + ...(params.clearTyping ? { clearTyping: params.clearTyping } : {}), + }, + }; + setActivePluginRegistry(createTestRegistry([{ pluginId: "telegram", plugin, source: "test" }])); +} + +function createHeartbeatConfig(params: { + tmpDir: string; + storePath: string; + session?: OpenClawConfig["session"]; + channelHeartbeat?: Record; +}): OpenClawConfig { + return { + agents: { + defaults: { + workspace: params.tmpDir, + heartbeat: { every: "5m", target: "telegram" }, + }, + }, + channels: { + telegram: { + allowFrom: ["*"], + ...(params.channelHeartbeat ? { heartbeat: params.channelHeartbeat } : {}), + }, + }, + session: { + store: params.storePath, + ...params.session, + }, + } as OpenClawConfig; +} + +async function seedTelegramSession(storePath: string, cfg: OpenClawConfig) { + await seedMainSessionStore(storePath, cfg, { + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: TELEGRAM_TARGET, + }); +} + +describe("runHeartbeatOnce heartbeat typing", () => { + beforeEach(() => { + setActivePluginRegistry(createTestRegistry()); + }); + + it("starts and clears typing around a heartbeat run", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const sendTyping = vi.fn(async () => undefined); + const clearTyping = vi.fn(async () => undefined); + installHeartbeatTypingPlugin({ sendTyping, clearTyping }); + const cfg = createHeartbeatConfig({ tmpDir, storePath }); + await seedTelegramSession(storePath, cfg); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + deps: { + getReplyFromConfig: replySpy, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(sendTyping).toHaveBeenCalledWith( + expect.objectContaining({ + cfg, + to: TELEGRAM_TARGET, + }), + ); + expect(clearTyping).toHaveBeenCalledWith( + expect.objectContaining({ + cfg, + to: TELEGRAM_TARGET, + }), + ); + expect(sendTyping.mock.invocationCallOrder[0]).toBeLessThan( + replySpy.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + }); + }); + + it("clears typing when the heartbeat run fails", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const sendTyping = vi.fn(async () => undefined); + const clearTyping = vi.fn(async () => undefined); + installHeartbeatTypingPlugin({ sendTyping, clearTyping }); + const cfg = createHeartbeatConfig({ tmpDir, storePath }); + await seedTelegramSession(storePath, cfg); + replySpy.mockRejectedValue(new Error("model unavailable")); + + const result = await runHeartbeatOnce({ + cfg, + deps: { + getReplyFromConfig: replySpy, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(result.status).toBe("failed"); + expect(sendTyping).toHaveBeenCalledTimes(1); + expect(clearTyping).toHaveBeenCalledTimes(1); + }); + }); + + it("does not type when typingMode is never", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const sendTyping = vi.fn(async () => undefined); + installHeartbeatTypingPlugin({ sendTyping }); + const cfg = createHeartbeatConfig({ + tmpDir, + storePath, + session: { typingMode: "never" }, + }); + await seedTelegramSession(storePath, cfg); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + deps: { + getReplyFromConfig: replySpy, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(sendTyping).not.toHaveBeenCalled(); + }); + }); + + it("does not type when chat heartbeat delivery is disabled", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const sendTyping = vi.fn(async () => undefined); + installHeartbeatTypingPlugin({ sendTyping }); + const cfg = createHeartbeatConfig({ + tmpDir, + storePath, + channelHeartbeat: { showAlerts: false, showOk: false, useIndicator: true }, + }); + await seedTelegramSession(storePath, cfg); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + deps: { + getReplyFromConfig: replySpy, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(sendTyping).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/infra/heartbeat-typing.test.ts b/src/infra/heartbeat-typing.test.ts new file mode 100644 index 00000000000..aaa87f33d01 --- /dev/null +++ b/src/infra/heartbeat-typing.test.ts @@ -0,0 +1,47 @@ +import { describe, expect, it, vi } from "vitest"; +import type { ChannelPlugin } from "../channels/plugins/types.public.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { createHeartbeatTypingCallbacks } from "./heartbeat-typing.js"; + +async function withFakeTimers(run: () => Promise) { + vi.useFakeTimers(); + try { + await run(); + } finally { + vi.useRealTimers(); + } +} + +describe("createHeartbeatTypingCallbacks", () => { + it("uses the normal 6s typing cadence by default", async () => { + await withFakeTimers(async () => { + const sendTyping = vi.fn(async () => undefined); + const plugin = { + heartbeat: { + sendTyping, + }, + } satisfies Pick; + + const callbacks = createHeartbeatTypingCallbacks({ + cfg: {} as OpenClawConfig, + target: { + channel: "telegram", + to: "123", + }, + plugin, + }); + + expect(callbacks).toBeDefined(); + await callbacks?.onReplyStart(); + expect(sendTyping).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(5_999); + expect(sendTyping).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1); + expect(sendTyping).toHaveBeenCalledTimes(2); + + callbacks?.onCleanup?.(); + }); + }); +}); diff --git a/src/infra/heartbeat-typing.ts b/src/infra/heartbeat-typing.ts new file mode 100644 index 00000000000..14fe06a0f66 --- /dev/null +++ b/src/infra/heartbeat-typing.ts @@ -0,0 +1,65 @@ +import type { ChannelHeartbeatDeps, ChannelPlugin } from "../channels/plugins/types.public.js"; +import { createTypingCallbacks, type TypingCallbacks } from "../channels/typing.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; + +const DEFAULT_HEARTBEAT_TYPING_INTERVAL_SECONDS = 6; + +type HeartbeatTypingLogger = { + debug?: (message: string, meta?: Record) => void; +}; + +export type HeartbeatTypingTarget = { + channel: string; + to?: string; + accountId?: string | null; + threadId?: string | number | null; +}; + +export function createHeartbeatTypingCallbacks(params: { + cfg: OpenClawConfig; + target: HeartbeatTypingTarget; + plugin?: Pick; + deps?: ChannelHeartbeatDeps; + typingIntervalSeconds?: number; + log?: HeartbeatTypingLogger; +}): TypingCallbacks | undefined { + const sendTyping = params.plugin?.heartbeat?.sendTyping; + const to = params.target.to?.trim(); + if (!sendTyping || !to) { + return undefined; + } + + const clearTyping = params.plugin?.heartbeat?.clearTyping; + const keepaliveIntervalMs = + typeof params.typingIntervalSeconds === "number" && params.typingIntervalSeconds > 0 + ? params.typingIntervalSeconds * 1000 + : DEFAULT_HEARTBEAT_TYPING_INTERVAL_SECONDS * 1000; + const target = { + cfg: params.cfg, + to, + ...(params.target.accountId !== undefined ? { accountId: params.target.accountId } : {}), + ...(params.target.threadId !== undefined ? { threadId: params.target.threadId } : {}), + ...(params.deps ? { deps: params.deps } : {}), + }; + + return createTypingCallbacks({ + start: async () => { + await sendTyping(target); + }, + ...(clearTyping + ? { + stop: async () => { + await clearTyping(target); + }, + } + : {}), + ...(keepaliveIntervalMs ? { keepaliveIntervalMs } : {}), + onStartError: (err) => { + params.log?.debug?.(`heartbeat typing failed for ${params.target.channel}`, { + error: String(err), + channel: params.target.channel, + accountId: params.target.accountId, + }); + }, + }); +}