From e0b8b80067cf0c997ef9ca8205b83f106d5a8fcb Mon Sep 17 00:00:00 2001 From: Eric Lytle Date: Fri, 20 Feb 2026 09:05:57 +0000 Subject: [PATCH] feat(hooks): add message:transcribed and message:preprocessed internal hooks Adds two new internal hook events that fire after media/link processing: - message:transcribed: fires when audio has been transcribed, providing the transcript text alongside the original body and media metadata. Useful for logging, analytics, or routing based on spoken content. - message:preprocessed: fires for every message after all media + link understanding completes. Gives hooks access to the fully enriched body (transcripts, image descriptions, link summaries) before the agent sees it. Both hooks are added in get-reply.ts, after applyMediaUnderstanding and applyLinkUnderstanding. message:received and message:sent are already in upstream (f07bb8e8) and are not duplicated here. Typed contexts (MessageTranscribedHookContext, MessagePreprocessedHookContext) and type guards (isMessageTranscribedEvent, isMessagePreprocessedEvent) added to internal-hooks.ts alongside the existing received/sent types. Test coverage in src/hooks/message-hooks.test.ts. --- docs/automation/hooks.md | 4 +- src/auto-reply/reply/get-reply.ts | 69 ++++++ src/config/types.hooks.ts | 2 +- src/hooks/internal-hooks.ts | 112 +++++++++ src/hooks/message-hooks.test.ts | 379 ++++++++++++++++++++++++++++++ 5 files changed, 564 insertions(+), 2 deletions(-) create mode 100644 src/hooks/message-hooks.test.ts diff --git a/docs/automation/hooks.md b/docs/automation/hooks.md index 0f561741d9a..3280475f6d5 100644 --- a/docs/automation/hooks.md +++ b/docs/automation/hooks.md @@ -258,7 +258,9 @@ Triggered when the gateway starts: Triggered when messages are received or sent: - **`message`**: All message events (general listener) -- **`message:received`**: When an inbound message is received from any channel +- **`message:received`**: When an inbound message is received from any channel. Fires early in processing before media understanding. Content may contain raw placeholders like `` for media attachments that haven't been processed yet. +- **`message:transcribed`**: When a message has been fully processed, including audio transcription and link understanding. At this point, `transcript` contains the full transcript text for audio messages. Use this hook when you need access to transcribed audio content. +- **`message:preprocessed`**: Fires for every message after all media + link understanding completes, giving hooks access to the fully enriched body (transcripts, image descriptions, link summaries) before the agent sees it. - **`message:sent`**: When an outbound message is successfully sent #### Message Event Context diff --git a/src/auto-reply/reply/get-reply.ts b/src/auto-reply/reply/get-reply.ts index 5c4edd35ac1..2a4fa0d1b6e 100644 --- a/src/auto-reply/reply/get-reply.ts +++ b/src/auto-reply/reply/get-reply.ts @@ -9,6 +9,8 @@ import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { DEFAULT_AGENT_WORKSPACE_DIR, ensureAgentWorkspace } from "../../agents/workspace.js"; import { resolveChannelModelOverride } from "../../channels/model-overrides.js"; import { type OpenClawConfig, loadConfig } from "../../config/config.js"; +import { logVerbose } from "../../globals.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { applyLinkUnderstanding } from "../../link-understanding/apply.js"; import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; import { defaultRuntime } from "../../runtime.js"; @@ -136,6 +138,73 @@ export async function getReplyFromConfig( }); } + const channelId = ( + finalized.OriginatingChannel ?? + finalized.Surface ?? + finalized.Provider ?? + "" + ).toLowerCase(); + + // Trigger message:transcribed hook after media understanding completes + // Only fire if transcription actually occurred (skip in fast test mode or non-audio) + if (finalized.Transcript) { + void triggerInternalHook( + createInternalHookEvent("message", "transcribed", finalized.SessionKey ?? "", { + from: finalized.From, + to: finalized.To, + body: finalized.Body, + bodyForAgent: finalized.BodyForAgent, + transcript: finalized.Transcript, + timestamp: finalized.Timestamp, + channelId, + conversationId: finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined, + messageId: finalized.MessageSid, + senderId: finalized.SenderId, + senderName: finalized.SenderName, + senderUsername: finalized.SenderUsername, + provider: finalized.Provider, + surface: finalized.Surface, + mediaPath: finalized.MediaPath, + mediaType: finalized.MediaType, + cfg, + }), + ).catch((err) => { + logVerbose(`get-reply: message:transcribed internal hook failed: ${String(err)}`); + }); + } + + // Trigger message:preprocessed hook after all media + link understanding. + // Fires for every message, giving hooks access to the fully enriched body + // (transcripts, image descriptions, link summaries) before the agent sees it. + void triggerInternalHook( + createInternalHookEvent("message", "preprocessed", finalized.SessionKey ?? "", { + from: finalized.From, + to: finalized.To, + body: finalized.Body, + bodyForAgent: finalized.BodyForAgent, + transcript: finalized.Transcript, + timestamp: finalized.Timestamp, + channelId, + conversationId: finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined, + messageId: finalized.MessageSid, + senderId: finalized.SenderId, + senderName: finalized.SenderName, + senderUsername: finalized.SenderUsername, + provider: finalized.Provider, + surface: finalized.Surface, + mediaPath: finalized.MediaPath, + mediaType: finalized.MediaType, + isGroup: Boolean(finalized.GroupSubject || finalized.GroupChannel), + groupId: + finalized.From?.includes(":group:") || finalized.From?.includes(":channel:") + ? finalized.From + : undefined, + cfg, + }), + ).catch((err) => { + logVerbose(`get-reply: message:preprocessed internal hook failed: ${String(err)}`); + }); + const commandAuthorized = finalized.CommandAuthorized; resolveCommandAuthorization({ ctx: finalized, diff --git a/src/config/types.hooks.ts b/src/config/types.hooks.ts index dc9086ed706..3c5f7a74f0e 100644 --- a/src/config/types.hooks.ts +++ b/src/config/types.hooks.ts @@ -73,7 +73,7 @@ export type HooksGmailConfig = { }; export type InternalHookHandlerConfig = { - /** Event key to listen for (e.g., 'command:new', 'session:start') */ + /** Event key to listen for (e.g., 'command:new', 'message:received', 'message:transcribed', 'session:start') */ event: string; /** Path to handler module (workspace-relative) */ module: string; diff --git a/src/hooks/internal-hooks.ts b/src/hooks/internal-hooks.ts index 95c70597f2b..70c6a06ccf2 100644 --- a/src/hooks/internal-hooks.ts +++ b/src/hooks/internal-hooks.ts @@ -93,6 +93,92 @@ export type MessageSentHookEvent = InternalHookEvent & { context: MessageSentHookContext; }; +export type MessageTranscribedHookContext = { + /** Sender identifier (e.g., phone number, user ID) */ + from?: string; + /** Recipient identifier */ + to?: string; + /** Original raw message body (e.g., "🎤 [Audio]") */ + body?: string; + /** Enriched body shown to the agent, including transcript */ + bodyForAgent?: string; + /** The transcribed text from audio */ + transcript: string; + /** Unix timestamp when the message was received */ + timestamp?: number; + /** Channel identifier (e.g., "telegram", "whatsapp") */ + channelId: string; + /** Conversation/chat ID */ + conversationId?: string; + /** Message ID from the provider */ + messageId?: string; + /** Sender user ID */ + senderId?: string; + /** Sender display name */ + senderName?: string; + /** Sender username */ + senderUsername?: string; + /** Provider name */ + provider?: string; + /** Surface name */ + surface?: string; + /** Path to the media file that was transcribed */ + mediaPath?: string; + /** MIME type of the media */ + mediaType?: string; +}; + +export type MessageTranscribedHookEvent = InternalHookEvent & { + type: "message"; + action: "transcribed"; + context: MessageTranscribedHookContext; +}; + +export type MessagePreprocessedHookContext = { + /** Sender identifier (e.g., phone number, user ID) */ + from?: string; + /** Recipient identifier */ + to?: string; + /** Original raw message body */ + body?: string; + /** Fully enriched body shown to the agent (transcripts, image descriptions, link summaries) */ + bodyForAgent?: string; + /** Transcribed audio text, if the message contained audio */ + transcript?: string; + /** Unix timestamp when the message was received */ + timestamp?: number; + /** Channel identifier (e.g., "telegram", "whatsapp") */ + channelId: string; + /** Conversation/chat ID */ + conversationId?: string; + /** Message ID from the provider */ + messageId?: string; + /** Sender user ID */ + senderId?: string; + /** Sender display name */ + senderName?: string; + /** Sender username */ + senderUsername?: string; + /** Provider name */ + provider?: string; + /** Surface name */ + surface?: string; + /** Path to the media file, if present */ + mediaPath?: string; + /** MIME type of the media, if present */ + mediaType?: string; + /** Whether this message was sent in a group/channel context */ + isGroup?: boolean; + /** Group or channel identifier, if applicable */ + groupId?: string; +}; + +export type MessagePreprocessedHookEvent = InternalHookEvent & { + type: "message"; + action: "preprocessed"; + context: MessagePreprocessedHookContext; +}; + export interface InternalHookEvent { /** The type of event (command, session, agent, gateway, etc.) */ type: InternalHookEventType; @@ -282,3 +368,29 @@ export function isMessageSentEvent(event: InternalHookEvent): event is MessageSe typeof context.success === "boolean" ); } + +export function isMessageTranscribedEvent( + event: InternalHookEvent, +): event is MessageTranscribedHookEvent { + if (event.type !== "message" || event.action !== "transcribed") { + return false; + } + const context = event.context as Partial | null; + if (!context || typeof context !== "object") { + return false; + } + return typeof context.transcript === "string" && typeof context.channelId === "string"; +} + +export function isMessagePreprocessedEvent( + event: InternalHookEvent, +): event is MessagePreprocessedHookEvent { + if (event.type !== "message" || event.action !== "preprocessed") { + return false; + } + const context = event.context as Partial | null; + if (!context || typeof context !== "object") { + return false; + } + return typeof context.channelId === "string"; +} diff --git a/src/hooks/message-hooks.test.ts b/src/hooks/message-hooks.test.ts new file mode 100644 index 00000000000..7433784c6c6 --- /dev/null +++ b/src/hooks/message-hooks.test.ts @@ -0,0 +1,379 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + clearInternalHooks, + createInternalHookEvent, + registerInternalHook, + triggerInternalHook, + type InternalHookEvent, +} from "./internal-hooks.js"; + +describe("message hooks", () => { + beforeEach(() => { + clearInternalHooks(); + }); + + afterEach(() => { + clearInternalHooks(); + }); + + describe("message:received", () => { + it("should trigger handler registered for message:received", async () => { + const handler = vi.fn(); + registerInternalHook("message:received", handler); + + const event = createInternalHookEvent("message", "received", "session-1", { + from: "user:123", + to: "bot:456", + content: "Hello world", + channelId: "telegram", + senderId: "123", + senderName: "Eric", + senderUsername: "eric_lytle", + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler.mock.calls[0][0].type).toBe("message"); + expect(handler.mock.calls[0][0].action).toBe("received"); + expect(handler.mock.calls[0][0].context.content).toBe("Hello world"); + expect(handler.mock.calls[0][0].context.channelId).toBe("telegram"); + expect(handler.mock.calls[0][0].context.senderName).toBe("Eric"); + }); + + it("should include sender and message metadata in context", async () => { + const handler = vi.fn(); + registerInternalHook("message:received", handler); + + const event = createInternalHookEvent("message", "received", "session-1", { + from: "signal:+15551234567", + to: "bot:+15559876543", + content: "Test message", + channelId: "signal", + conversationId: "conv-abc", + messageId: "msg-xyz", + senderId: "sender-1", + senderName: "Test User", + senderUsername: "testuser", + senderE164: "+15551234567", + provider: "signal", + surface: "signal", + threadId: "thread-1", + originatingChannel: "signal", + originatingTo: "bot:+15559876543", + timestamp: 1707600000, + }); + await triggerInternalHook(event); + + const ctx = handler.mock.calls[0][0].context; + expect(ctx.messageId).toBe("msg-xyz"); + expect(ctx.senderId).toBe("sender-1"); + expect(ctx.senderE164).toBe("+15551234567"); + expect(ctx.threadId).toBe("thread-1"); + expect(ctx.timestamp).toBe(1707600000); + }); + }); + + describe("message:transcribed", () => { + it("should trigger handler registered for message:transcribed", async () => { + const handler = vi.fn(); + registerInternalHook("message:transcribed", handler); + + const event = createInternalHookEvent("message", "transcribed", "session-1", { + from: "user:123", + to: "bot:456", + transcript: "This is what the user said", + body: "🎤 Audio message", + channelId: "telegram", + mediaPath: "/tmp/audio.ogg", + mediaType: "audio/ogg", + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler.mock.calls[0][0].action).toBe("transcribed"); + expect(handler.mock.calls[0][0].context.transcript).toBe("This is what the user said"); + expect(handler.mock.calls[0][0].context.mediaType).toBe("audio/ogg"); + }); + + it("should include both raw body and transcript in context", async () => { + const handler = vi.fn(); + registerInternalHook("message:transcribed", handler); + + const event = createInternalHookEvent("message", "transcribed", "session-1", { + body: "🎤 [Audio]", + bodyForAgent: "[Audio] Transcript: Hello from voice", + transcript: "Hello from voice", + channelId: "telegram", + }); + await triggerInternalHook(event); + + const ctx = handler.mock.calls[0][0].context; + expect(ctx.body).toBe("🎤 [Audio]"); + expect(ctx.bodyForAgent).toBe("[Audio] Transcript: Hello from voice"); + expect(ctx.transcript).toBe("Hello from voice"); + }); + }); + + describe("message:preprocessed", () => { + it("should trigger handler registered for message:preprocessed", async () => { + const handler = vi.fn(); + registerInternalHook("message:preprocessed", handler); + + const event = createInternalHookEvent("message", "preprocessed", "session-1", { + from: "user:123", + to: "bot:456", + body: "Check out this link", + bodyForAgent: "Check out this link\n[Link summary: Article about testing]", + channelId: "telegram", + senderId: "123", + senderName: "Eric", + isGroup: false, + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler.mock.calls[0][0].action).toBe("preprocessed"); + expect(handler.mock.calls[0][0].context.bodyForAgent).toContain("Link summary"); + }); + + it("should include both transcript and link summary for enriched audio messages", async () => { + const handler = vi.fn(); + registerInternalHook("message:preprocessed", handler); + + const event = createInternalHookEvent("message", "preprocessed", "session-1", { + body: "🎤 [Audio]", + bodyForAgent: "[Audio] Transcript: Check https://example.com\n[Link summary: Example site]", + transcript: "Check https://example.com", + channelId: "telegram", + mediaType: "audio/ogg", + isGroup: false, + }); + await triggerInternalHook(event); + + const ctx = handler.mock.calls[0][0].context; + expect(ctx.transcript).toBe("Check https://example.com"); + expect(ctx.bodyForAgent).toContain("Link summary"); + expect(ctx.bodyForAgent).toContain("Transcript:"); + }); + + it("should fire for plain text messages without media", async () => { + const handler = vi.fn(); + registerInternalHook("message:preprocessed", handler); + + const event = createInternalHookEvent("message", "preprocessed", "session-1", { + body: "Just a text message", + bodyForAgent: "Just a text message", + channelId: "signal", + isGroup: false, + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledOnce(); + const ctx = handler.mock.calls[0][0].context; + expect(ctx.transcript).toBeUndefined(); + expect(ctx.mediaType).toBeUndefined(); + expect(ctx.body).toBe("Just a text message"); + }); + }); + + describe("message:sent", () => { + it("should trigger handler registered for message:sent", async () => { + const handler = vi.fn(); + registerInternalHook("message:sent", handler); + + const event = createInternalHookEvent("message", "sent", "session-1", { + from: "bot:456", + to: "user:123", + content: "Here is my reply", + channelId: "telegram", + provider: "telegram", + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledOnce(); + expect(handler.mock.calls[0][0].action).toBe("sent"); + expect(handler.mock.calls[0][0].context.content).toBe("Here is my reply"); + }); + + it("should include channel and routing context", async () => { + const handler = vi.fn(); + registerInternalHook("message:sent", handler); + + const event = createInternalHookEvent("message", "sent", "session-1", { + from: "bot:456", + to: "user:123", + content: "Reply text", + channelId: "discord", + conversationId: "channel:C123", + provider: "discord", + surface: "discord", + threadId: "thread-abc", + originatingChannel: "discord", + originatingTo: "channel:C123", + }); + await triggerInternalHook(event); + + const ctx = handler.mock.calls[0][0].context; + expect(ctx.channelId).toBe("discord"); + expect(ctx.conversationId).toBe("channel:C123"); + expect(ctx.threadId).toBe("thread-abc"); + }); + }); + + describe("general message handler", () => { + it("should receive all message event types (received, transcribed, preprocessed, sent)", async () => { + const events: InternalHookEvent[] = []; + registerInternalHook("message", (event) => { + events.push(event); + }); + + await triggerInternalHook( + createInternalHookEvent("message", "received", "s1", { content: "hi" }), + ); + await triggerInternalHook( + createInternalHookEvent("message", "transcribed", "s1", { transcript: "hello" }), + ); + await triggerInternalHook( + createInternalHookEvent("message", "preprocessed", "s1", { + body: "hello", + bodyForAgent: "hello", + }), + ); + await triggerInternalHook( + createInternalHookEvent("message", "sent", "s1", { content: "reply" }), + ); + + expect(events).toHaveLength(4); + expect(events[0].action).toBe("received"); + expect(events[1].action).toBe("transcribed"); + expect(events[2].action).toBe("preprocessed"); + expect(events[3].action).toBe("sent"); + }); + + it("should trigger both general and specific handlers for same event", async () => { + const generalHandler = vi.fn(); + const specificHandler = vi.fn(); + + registerInternalHook("message", generalHandler); + registerInternalHook("message:received", specificHandler); + + const event = createInternalHookEvent("message", "received", "s1", { content: "test" }); + await triggerInternalHook(event); + + expect(generalHandler).toHaveBeenCalledOnce(); + expect(specificHandler).toHaveBeenCalledOnce(); + }); + + it("should not trigger message:sent handler for message:received events", async () => { + const sentHandler = vi.fn(); + registerInternalHook("message:sent", sentHandler); + + await triggerInternalHook( + createInternalHookEvent("message", "received", "s1", { content: "hi" }), + ); + + expect(sentHandler).not.toHaveBeenCalled(); + }); + }); + + describe("error isolation", () => { + it("should not propagate handler errors to caller", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + const badHandler = vi.fn(() => { + throw new Error("Hook exploded"); + }); + registerInternalHook("message:received", badHandler); + + const event = createInternalHookEvent("message", "received", "s1", { content: "test" }); + await expect(triggerInternalHook(event)).resolves.not.toThrow(); + + expect(consoleError).toHaveBeenCalledWith( + expect.stringContaining("Hook error"), + expect.stringContaining("Hook exploded"), + ); + consoleError.mockRestore(); + }); + + it("should continue running subsequent handlers after one fails", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + const failHandler = vi.fn(() => { + throw new Error("First handler fails"); + }); + const successHandler = vi.fn(); + + registerInternalHook("message:received", failHandler); + registerInternalHook("message:received", successHandler); + + await triggerInternalHook( + createInternalHookEvent("message", "received", "s1", { content: "test" }), + ); + + expect(failHandler).toHaveBeenCalled(); + expect(successHandler).toHaveBeenCalled(); + consoleError.mockRestore(); + }); + + it("should isolate async handler errors", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + const asyncFailHandler = vi.fn(async () => { + throw new Error("Async hook failed"); + }); + registerInternalHook("message:sent", asyncFailHandler); + + await expect( + triggerInternalHook(createInternalHookEvent("message", "sent", "s1", { content: "reply" })), + ).resolves.not.toThrow(); + + consoleError.mockRestore(); + }); + }); + + describe("event structure", () => { + it("should include timestamp on all message events", async () => { + const handler = vi.fn(); + registerInternalHook("message", handler); + + const before = new Date(); + await triggerInternalHook( + createInternalHookEvent("message", "received", "s1", { content: "hi" }), + ); + const after = new Date(); + + const event = handler.mock.calls[0][0] as InternalHookEvent; + expect(event.timestamp).toBeInstanceOf(Date); + expect(event.timestamp.getTime()).toBeGreaterThanOrEqual(before.getTime()); + expect(event.timestamp.getTime()).toBeLessThanOrEqual(after.getTime()); + }); + + it("should include messages array for hook responses", async () => { + const handler = vi.fn((event: InternalHookEvent) => { + event.messages.push("Echo: received your message"); + }); + registerInternalHook("message:received", handler); + + const event = createInternalHookEvent("message", "received", "s1", { content: "hello" }); + await triggerInternalHook(event); + + expect(event.messages).toContain("Echo: received your message"); + }); + + it("should preserve sessionKey across event lifecycle", async () => { + const events: InternalHookEvent[] = []; + registerInternalHook("message", (e) => events.push(e)); + + await triggerInternalHook( + createInternalHookEvent("message", "received", "agent:main:telegram:abc", { + content: "hi", + }), + ); + await triggerInternalHook( + createInternalHookEvent("message", "sent", "agent:main:telegram:abc", { + content: "reply", + }), + ); + + expect(events[0].sessionKey).toBe("agent:main:telegram:abc"); + expect(events[1].sessionKey).toBe("agent:main:telegram:abc"); + }); + }); +});