diff --git a/CHANGELOG.md b/CHANGELOG.md index 2818244f606..9bfb892cfdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai - Zalo Personal plugin (`@openclaw/zalouser`): rebuilt channel runtime to use native `zca-js` integration in-process, removing external CLI transport usage and keeping QR/login + send/listen flows fully inside OpenClaw. - Telegram/DM streaming: use `sendMessageDraft` for private preview streaming, keep reasoning/answer preview lanes separated in DM reasoning-stream mode. (#31824) Thanks @obviyus. - Telegram/voice mention gating: add optional `disableAudioPreflight` on group/topic config to skip mention-detection preflight transcription for inbound voice notes where operators want text-only mention checks. (#23067) Thanks @yangnim21029. +- Hooks/message lifecycle: add internal hook events `message:transcribed` and `message:preprocessed`, plus richer outbound `message:sent` context (`isGroup`, `groupId`) for group-conversation correlation and post-transcription automations. (#9859) Thanks @Drickon. - CLI/Config validation: add `openclaw config validate` (with `--json`) to validate config files before gateway startup, and include detailed invalid-key paths in startup invalid-config errors. (#31220) thanks @Sid-Qin. - Tools/Diffs: add PDF file output support and rendering quality customization controls (`fileQuality`, `fileScale`, `fileMaxWidth`) for generated diff artifacts, and document PDF as the preferred option when messaging channels compress images. (#31342) Thanks @gumadeiras. - README/Contributors: rank contributor avatars by composite score (commits + merged PRs + code LOC), excluding docs-only LOC to prevent bulk-generated files from inflating rankings. (#23970) Thanks @tyler6204. diff --git a/docs/automation/hooks.md b/docs/automation/hooks.md index 3280475f6d5..d34480f1ed3 100644 --- a/docs/automation/hooks.md +++ b/docs/automation/hooks.md @@ -299,6 +299,30 @@ Message events include rich context about the message: accountId?: string, // Provider account ID conversationId?: string, // Chat/conversation ID messageId?: string, // Message ID returned by the provider + isGroup?: boolean, // Whether this outbound message belongs to a group/channel context + groupId?: string, // Group/channel identifier for correlation with message:received +} + +// message:transcribed context +{ + body?: string, // Raw inbound body before enrichment + bodyForAgent?: string, // Enriched body visible to the agent + transcript: string, // Audio transcript text + channelId: string, // Channel (e.g., "telegram", "whatsapp") + conversationId?: string, + messageId?: string, +} + +// message:preprocessed context +{ + body?: string, // Raw inbound body + bodyForAgent?: string, // Final enriched body after media/link understanding + transcript?: string, // Transcript when audio was present + channelId: string, // Channel (e.g., "telegram", "whatsapp") + conversationId?: string, + messageId?: string, + isGroup?: boolean, + groupId?: string, } ``` diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 3f59e81f7d1..2b703a399f5 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -268,6 +268,7 @@ describe("dispatchReplyFromConfig", () => { Provider: "slack", AccountId: "acc-1", MessageThreadId: 123, + GroupChannel: "ops-room", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", }); @@ -286,6 +287,8 @@ describe("dispatchReplyFromConfig", () => { to: "telegram:999", accountId: "acc-1", threadId: 123, + isGroup: true, + groupId: "telegram:999", }), ); }); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 835519e0cdc..2e3265b44e5 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -106,9 +106,6 @@ export async function dispatchReplyFromConfig(params: { const sessionKey = ctx.SessionKey; const startTime = diagnosticsEnabled ? Date.now() : 0; const canTrackSession = diagnosticsEnabled && Boolean(sessionKey); - const isGroup = Boolean(ctx.GroupSubject || ctx.GroupChannel); - const groupId = - ctx.From?.includes(":group:") || ctx.From?.includes(":channel:") ? ctx.From : undefined; const recordProcessed = ( outcome: "completed" | "skipped" | "error", @@ -180,6 +177,8 @@ export async function dispatchReplyFromConfig(params: { : ""; const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; + const isGroup = Boolean(ctx.GroupSubject || ctx.GroupChannel); + const groupId = isGroup ? conversationId : undefined; // Trigger plugin hooks (fire-and-forget) if (hookRunner?.hasHooks("message_received")) { diff --git a/src/auto-reply/reply/get-reply.message-hooks.test.ts b/src/auto-reply/reply/get-reply.message-hooks.test.ts new file mode 100644 index 00000000000..c10604a9fd2 --- /dev/null +++ b/src/auto-reply/reply/get-reply.message-hooks.test.ts @@ -0,0 +1,236 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { MsgContext } from "../templating.js"; + +const mocks = vi.hoisted(() => ({ + applyMediaUnderstanding: vi.fn(async (..._args: unknown[]) => undefined), + applyLinkUnderstanding: vi.fn(async (..._args: unknown[]) => undefined), + createInternalHookEvent: vi.fn(), + triggerInternalHook: vi.fn(async (..._args: unknown[]) => undefined), + resolveReplyDirectives: vi.fn(), + initSessionState: vi.fn(), +})); + +vi.mock("../../agents/agent-scope.js", () => ({ + resolveAgentDir: vi.fn(() => "/tmp/agent"), + resolveAgentWorkspaceDir: vi.fn(() => "/tmp/workspace"), + resolveSessionAgentId: vi.fn(() => "main"), + resolveAgentSkillsFilter: vi.fn(() => undefined), +})); +vi.mock("../../agents/model-selection.js", () => ({ + resolveModelRefFromString: vi.fn(() => null), +})); +vi.mock("../../agents/timeout.js", () => ({ + resolveAgentTimeoutMs: vi.fn(() => 60000), +})); +vi.mock("../../agents/workspace.js", () => ({ + DEFAULT_AGENT_WORKSPACE_DIR: "/tmp/workspace", + ensureAgentWorkspace: vi.fn(async () => ({ dir: "/tmp/workspace" })), +})); +vi.mock("../../channels/model-overrides.js", () => ({ + resolveChannelModelOverride: vi.fn(() => undefined), +})); +vi.mock("../../config/config.js", () => ({ + loadConfig: vi.fn(() => ({})), +})); +vi.mock("../../globals.js", () => ({ + logVerbose: vi.fn(), +})); +vi.mock("../../hooks/internal-hooks.js", () => ({ + createInternalHookEvent: mocks.createInternalHookEvent, + triggerInternalHook: mocks.triggerInternalHook, +})); +vi.mock("../../link-understanding/apply.js", () => ({ + applyLinkUnderstanding: mocks.applyLinkUnderstanding, +})); +vi.mock("../../media-understanding/apply.js", () => ({ + applyMediaUnderstanding: mocks.applyMediaUnderstanding, +})); +vi.mock("../../runtime.js", () => ({ + defaultRuntime: { log: vi.fn() }, +})); +vi.mock("../command-auth.js", () => ({ + resolveCommandAuthorization: vi.fn(() => ({ isAuthorizedSender: true })), +})); +vi.mock("./commands-core.js", () => ({ + emitResetCommandHooks: vi.fn(async () => undefined), +})); +vi.mock("./directive-handling.js", () => ({ + resolveDefaultModel: vi.fn(() => ({ + defaultProvider: "openai", + defaultModel: "gpt-4o-mini", + aliasIndex: new Map(), + })), +})); +vi.mock("./get-reply-directives.js", () => ({ + resolveReplyDirectives: mocks.resolveReplyDirectives, +})); +vi.mock("./get-reply-inline-actions.js", () => ({ + handleInlineActions: vi.fn(async () => ({ kind: "reply", reply: { text: "ok" } })), +})); +vi.mock("./get-reply-run.js", () => ({ + runPreparedReply: vi.fn(async () => undefined), +})); +vi.mock("./inbound-context.js", () => ({ + finalizeInboundContext: vi.fn((ctx: unknown) => ctx), +})); +vi.mock("./session-reset-model.js", () => ({ + applyResetModelOverride: vi.fn(async () => undefined), +})); +vi.mock("./session.js", () => ({ + initSessionState: mocks.initSessionState, +})); +vi.mock("./stage-sandbox-media.js", () => ({ + stageSandboxMedia: vi.fn(async () => undefined), +})); +vi.mock("./typing.js", () => ({ + createTypingController: vi.fn(() => ({ + onReplyStart: async () => undefined, + startTypingLoop: async () => undefined, + startTypingOnText: async () => undefined, + refreshTypingTtl: () => undefined, + isActive: () => false, + markRunComplete: () => undefined, + markDispatchIdle: () => undefined, + cleanup: () => undefined, + })), +})); + +const { getReplyFromConfig } = await import("./get-reply.js"); + +function buildCtx(overrides: Partial = {}): MsgContext { + return { + Provider: "telegram", + Surface: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:-100123", + ChatType: "group", + Body: "", + BodyForAgent: "", + RawBody: "", + CommandBody: "", + SessionKey: "agent:main:telegram:-100123", + From: "telegram:user:42", + To: "telegram:-100123", + GroupChannel: "ops", + Timestamp: 1710000000000, + ...overrides, + }; +} + +describe("getReplyFromConfig message hooks", () => { + beforeEach(() => { + delete process.env.OPENCLAW_TEST_FAST; + mocks.applyMediaUnderstanding.mockReset(); + mocks.applyLinkUnderstanding.mockReset(); + mocks.createInternalHookEvent.mockReset(); + mocks.triggerInternalHook.mockReset(); + mocks.resolveReplyDirectives.mockReset(); + mocks.initSessionState.mockReset(); + + mocks.applyMediaUnderstanding.mockImplementation(async (...args: unknown[]) => { + const { ctx } = args[0] as { ctx: MsgContext }; + ctx.Transcript = "voice transcript"; + ctx.Body = "[Audio]\nTranscript:\nvoice transcript"; + ctx.BodyForAgent = "[Audio]\nTranscript:\nvoice transcript"; + }); + mocks.applyLinkUnderstanding.mockResolvedValue(undefined); + mocks.createInternalHookEvent.mockImplementation( + (type: string, action: string, sessionKey: string, context: Record) => ({ + type, + action, + sessionKey, + context, + timestamp: new Date(), + messages: [], + }), + ); + mocks.triggerInternalHook.mockResolvedValue(undefined); + mocks.resolveReplyDirectives.mockResolvedValue({ kind: "reply", reply: { text: "ok" } }); + mocks.initSessionState.mockResolvedValue({ + sessionCtx: {}, + sessionEntry: {}, + previousSessionEntry: {}, + sessionStore: {}, + sessionKey: "agent:main:telegram:-100123", + sessionId: "session-1", + isNewSession: false, + resetTriggered: false, + systemSent: false, + abortedLastRun: false, + storePath: "/tmp/sessions.json", + sessionScope: "per-chat", + groupResolution: undefined, + isGroup: true, + triggerBodyNormalized: "", + bodyStripped: "", + }); + }); + + it("emits transcribed + preprocessed hooks with enriched context", async () => { + const ctx = buildCtx(); + + await getReplyFromConfig(ctx, undefined, {}); + + expect(mocks.createInternalHookEvent).toHaveBeenCalledTimes(2); + expect(mocks.createInternalHookEvent).toHaveBeenNthCalledWith( + 1, + "message", + "transcribed", + "agent:main:telegram:-100123", + expect.objectContaining({ + transcript: "voice transcript", + channelId: "telegram", + conversationId: "telegram:-100123", + }), + ); + expect(mocks.createInternalHookEvent).toHaveBeenNthCalledWith( + 2, + "message", + "preprocessed", + "agent:main:telegram:-100123", + expect.objectContaining({ + transcript: "voice transcript", + isGroup: true, + groupId: "telegram:-100123", + }), + ); + expect(mocks.triggerInternalHook).toHaveBeenCalledTimes(2); + }); + + it("emits only preprocessed when no transcript is produced", async () => { + mocks.applyMediaUnderstanding.mockImplementationOnce(async (...args: unknown[]) => { + const { ctx } = args[0] as { ctx: MsgContext }; + ctx.Transcript = undefined; + ctx.Body = ""; + ctx.BodyForAgent = ""; + }); + + await getReplyFromConfig(buildCtx(), undefined, {}); + + expect(mocks.createInternalHookEvent).toHaveBeenCalledTimes(1); + expect(mocks.createInternalHookEvent).toHaveBeenCalledWith( + "message", + "preprocessed", + "agent:main:telegram:-100123", + expect.any(Object), + ); + }); + + it("skips message hooks in fast test mode", async () => { + process.env.OPENCLAW_TEST_FAST = "1"; + + await getReplyFromConfig(buildCtx(), undefined, {}); + + expect(mocks.applyMediaUnderstanding).not.toHaveBeenCalled(); + expect(mocks.applyLinkUnderstanding).not.toHaveBeenCalled(); + expect(mocks.createInternalHookEvent).not.toHaveBeenCalled(); + expect(mocks.triggerInternalHook).not.toHaveBeenCalled(); + }); + + it("skips message hooks when SessionKey is unavailable", async () => { + await getReplyFromConfig(buildCtx({ SessionKey: undefined }), undefined, {}); + + expect(mocks.createInternalHookEvent).not.toHaveBeenCalled(); + expect(mocks.triggerInternalHook).not.toHaveBeenCalled(); + }); +}); diff --git a/src/auto-reply/reply/get-reply.ts b/src/auto-reply/reply/get-reply.ts index 2a4fa0d1b6e..f9f2dc8a90e 100644 --- a/src/auto-reply/reply/get-reply.ts +++ b/src/auto-reply/reply/get-reply.ts @@ -144,12 +144,16 @@ export async function getReplyFromConfig( finalized.Provider ?? "" ).toLowerCase(); + const hookSessionKey = finalized.SessionKey?.trim(); + const conversationId = finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined; + const isGroupConversation = Boolean(finalized.GroupSubject || finalized.GroupChannel); + const groupId = isGroupConversation ? conversationId : undefined; // Trigger message:transcribed hook after media understanding completes // Only fire if transcription actually occurred (skip in fast test mode or non-audio) - if (finalized.Transcript) { + if (!isFastTestEnv && hookSessionKey && finalized.Transcript) { void triggerInternalHook( - createInternalHookEvent("message", "transcribed", finalized.SessionKey ?? "", { + createInternalHookEvent("message", "transcribed", hookSessionKey, { from: finalized.From, to: finalized.To, body: finalized.Body, @@ -157,7 +161,7 @@ export async function getReplyFromConfig( transcript: finalized.Transcript, timestamp: finalized.Timestamp, channelId, - conversationId: finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined, + conversationId, messageId: finalized.MessageSid, senderId: finalized.SenderId, senderName: finalized.SenderName, @@ -176,34 +180,33 @@ export async function getReplyFromConfig( // Trigger message:preprocessed hook after all media + link understanding. // Fires for every message, giving hooks access to the fully enriched body // (transcripts, image descriptions, link summaries) before the agent sees it. - void triggerInternalHook( - createInternalHookEvent("message", "preprocessed", finalized.SessionKey ?? "", { - from: finalized.From, - to: finalized.To, - body: finalized.Body, - bodyForAgent: finalized.BodyForAgent, - transcript: finalized.Transcript, - timestamp: finalized.Timestamp, - channelId, - conversationId: finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined, - messageId: finalized.MessageSid, - senderId: finalized.SenderId, - senderName: finalized.SenderName, - senderUsername: finalized.SenderUsername, - provider: finalized.Provider, - surface: finalized.Surface, - mediaPath: finalized.MediaPath, - mediaType: finalized.MediaType, - isGroup: Boolean(finalized.GroupSubject || finalized.GroupChannel), - groupId: - finalized.From?.includes(":group:") || finalized.From?.includes(":channel:") - ? finalized.From - : undefined, - cfg, - }), - ).catch((err) => { - logVerbose(`get-reply: message:preprocessed internal hook failed: ${String(err)}`); - }); + if (!isFastTestEnv && hookSessionKey) { + void triggerInternalHook( + createInternalHookEvent("message", "preprocessed", hookSessionKey, { + from: finalized.From, + to: finalized.To, + body: finalized.Body, + bodyForAgent: finalized.BodyForAgent, + transcript: finalized.Transcript, + timestamp: finalized.Timestamp, + channelId, + conversationId, + messageId: finalized.MessageSid, + senderId: finalized.SenderId, + senderName: finalized.SenderName, + senderUsername: finalized.SenderUsername, + provider: finalized.Provider, + surface: finalized.Surface, + mediaPath: finalized.MediaPath, + mediaType: finalized.MediaType, + isGroup: isGroupConversation, + groupId, + cfg, + }), + ).catch((err) => { + logVerbose(`get-reply: message:preprocessed internal hook failed: ${String(err)}`); + }); + } const commandAuthorized = finalized.CommandAuthorized; resolveCommandAuthorization({ diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index e33fa1162d7..9b5d432149a 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -383,6 +383,8 @@ describe("routeReply", () => { channel: "slack", to: "channel:C123", sessionKey: "agent:main:main", + isGroup: true, + groupId: "channel:C123", cfg: {} as never, }); expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( @@ -390,6 +392,8 @@ describe("routeReply", () => { mirror: expect.objectContaining({ sessionKey: "agent:main:main", text: "hi", + isGroup: true, + groupId: "channel:C123", }), }), ); diff --git a/src/hooks/message-hooks.test.ts b/src/hooks/message-hooks.test.ts index b568197937b..9232e45c52e 100644 --- a/src/hooks/message-hooks.test.ts +++ b/src/hooks/message-hooks.test.ts @@ -278,7 +278,6 @@ describe("message hooks", () => { describe("error isolation", () => { it("should not propagate handler errors to caller", async () => { - const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); const badHandler = vi.fn(() => { throw new Error("Hook exploded"); }); @@ -286,16 +285,10 @@ describe("message hooks", () => { const event = createInternalHookEvent("message", "received", "s1", { content: "test" }); await expect(triggerInternalHook(event)).resolves.not.toThrow(); - - expect(consoleError).toHaveBeenCalledWith( - expect.stringContaining("Hook error"), - expect.stringContaining("Hook exploded"), - ); - consoleError.mockRestore(); + expect(badHandler).toHaveBeenCalledOnce(); }); it("should continue running subsequent handlers after one fails", async () => { - const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); const failHandler = vi.fn(() => { throw new Error("First handler fails"); }); @@ -310,11 +303,9 @@ describe("message hooks", () => { expect(failHandler).toHaveBeenCalled(); expect(successHandler).toHaveBeenCalled(); - consoleError.mockRestore(); }); it("should isolate async handler errors", async () => { - const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); const asyncFailHandler = vi.fn(async () => { throw new Error("Async hook failed"); }); @@ -323,8 +314,7 @@ describe("message hooks", () => { await expect( triggerInternalHook(createInternalHookEvent("message", "sent", "s1", { content: "reply" })), ).resolves.not.toThrow(); - - consoleError.mockRestore(); + expect(asyncFailHandler).toHaveBeenCalledOnce(); }); }); diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index e23cdf496f7..50be7afd553 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -627,6 +627,8 @@ describe("deliverOutboundPayloads", () => { const { sendWhatsApp } = await runChunkedWhatsAppDelivery({ mirror: { sessionKey: "agent:main:main", + isGroup: true, + groupId: "whatsapp:group:123", }, }); expect(sendWhatsApp).toHaveBeenCalledTimes(2); @@ -643,6 +645,8 @@ describe("deliverOutboundPayloads", () => { channelId: "whatsapp", conversationId: "+1555", messageId: "w2", + isGroup: true, + groupId: "whatsapp:group:123", }), ); expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);