fix: harden message hook session/group context and add integration coverage (#9859) (thanks @Drickon)

This commit is contained in:
Peter Steinberger
2026-03-02 22:34:27 +00:00
parent b5102ba4f9
commit 29dde80c3e
9 changed files with 310 additions and 46 deletions

View File

@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
- Zalo Personal plugin (`@openclaw/zalouser`): rebuilt channel runtime to use native `zca-js` integration in-process, removing external CLI transport usage and keeping QR/login + send/listen flows fully inside OpenClaw.
- Telegram/DM streaming: use `sendMessageDraft` for private preview streaming, keep reasoning/answer preview lanes separated in DM reasoning-stream mode. (#31824) Thanks @obviyus.
- Telegram/voice mention gating: add optional `disableAudioPreflight` on group/topic config to skip mention-detection preflight transcription for inbound voice notes where operators want text-only mention checks. (#23067) Thanks @yangnim21029.
- Hooks/message lifecycle: add internal hook events `message:transcribed` and `message:preprocessed`, plus richer outbound `message:sent` context (`isGroup`, `groupId`) for group-conversation correlation and post-transcription automations. (#9859) Thanks @Drickon.
- CLI/Config validation: add `openclaw config validate` (with `--json`) to validate config files before gateway startup, and include detailed invalid-key paths in startup invalid-config errors. (#31220) thanks @Sid-Qin.
- Tools/Diffs: add PDF file output support and rendering quality customization controls (`fileQuality`, `fileScale`, `fileMaxWidth`) for generated diff artifacts, and document PDF as the preferred option when messaging channels compress images. (#31342) Thanks @gumadeiras.
- README/Contributors: rank contributor avatars by composite score (commits + merged PRs + code LOC), excluding docs-only LOC to prevent bulk-generated files from inflating rankings. (#23970) Thanks @tyler6204.

View File

@@ -299,6 +299,30 @@ Message events include rich context about the message:
accountId?: string, // Provider account ID
conversationId?: string, // Chat/conversation ID
messageId?: string, // Message ID returned by the provider
isGroup?: boolean, // Whether this outbound message belongs to a group/channel context
groupId?: string, // Group/channel identifier for correlation with message:received
}
// message:transcribed context
{
body?: string, // Raw inbound body before enrichment
bodyForAgent?: string, // Enriched body visible to the agent
transcript: string, // Audio transcript text
channelId: string, // Channel (e.g., "telegram", "whatsapp")
conversationId?: string,
messageId?: string,
}
// message:preprocessed context
{
body?: string, // Raw inbound body
bodyForAgent?: string, // Final enriched body after media/link understanding
transcript?: string, // Transcript when audio was present
channelId: string, // Channel (e.g., "telegram", "whatsapp")
conversationId?: string,
messageId?: string,
isGroup?: boolean,
groupId?: string,
}
```

View File

@@ -268,6 +268,7 @@ describe("dispatchReplyFromConfig", () => {
Provider: "slack",
AccountId: "acc-1",
MessageThreadId: 123,
GroupChannel: "ops-room",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:999",
});
@@ -286,6 +287,8 @@ describe("dispatchReplyFromConfig", () => {
to: "telegram:999",
accountId: "acc-1",
threadId: 123,
isGroup: true,
groupId: "telegram:999",
}),
);
});

View File

@@ -106,9 +106,6 @@ export async function dispatchReplyFromConfig(params: {
const sessionKey = ctx.SessionKey;
const startTime = diagnosticsEnabled ? Date.now() : 0;
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
const isGroup = Boolean(ctx.GroupSubject || ctx.GroupChannel);
const groupId =
ctx.From?.includes(":group:") || ctx.From?.includes(":channel:") ? ctx.From : undefined;
const recordProcessed = (
outcome: "completed" | "skipped" | "error",
@@ -180,6 +177,8 @@ export async function dispatchReplyFromConfig(params: {
: "";
const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase();
const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined;
const isGroup = Boolean(ctx.GroupSubject || ctx.GroupChannel);
const groupId = isGroup ? conversationId : undefined;
// Trigger plugin hooks (fire-and-forget)
if (hookRunner?.hasHooks("message_received")) {

View File

@@ -0,0 +1,236 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { MsgContext } from "../templating.js";
const mocks = vi.hoisted(() => ({
applyMediaUnderstanding: vi.fn(async (..._args: unknown[]) => undefined),
applyLinkUnderstanding: vi.fn(async (..._args: unknown[]) => undefined),
createInternalHookEvent: vi.fn(),
triggerInternalHook: vi.fn(async (..._args: unknown[]) => undefined),
resolveReplyDirectives: vi.fn(),
initSessionState: vi.fn(),
}));
vi.mock("../../agents/agent-scope.js", () => ({
resolveAgentDir: vi.fn(() => "/tmp/agent"),
resolveAgentWorkspaceDir: vi.fn(() => "/tmp/workspace"),
resolveSessionAgentId: vi.fn(() => "main"),
resolveAgentSkillsFilter: vi.fn(() => undefined),
}));
vi.mock("../../agents/model-selection.js", () => ({
resolveModelRefFromString: vi.fn(() => null),
}));
vi.mock("../../agents/timeout.js", () => ({
resolveAgentTimeoutMs: vi.fn(() => 60000),
}));
vi.mock("../../agents/workspace.js", () => ({
DEFAULT_AGENT_WORKSPACE_DIR: "/tmp/workspace",
ensureAgentWorkspace: vi.fn(async () => ({ dir: "/tmp/workspace" })),
}));
vi.mock("../../channels/model-overrides.js", () => ({
resolveChannelModelOverride: vi.fn(() => undefined),
}));
vi.mock("../../config/config.js", () => ({
loadConfig: vi.fn(() => ({})),
}));
vi.mock("../../globals.js", () => ({
logVerbose: vi.fn(),
}));
vi.mock("../../hooks/internal-hooks.js", () => ({
createInternalHookEvent: mocks.createInternalHookEvent,
triggerInternalHook: mocks.triggerInternalHook,
}));
vi.mock("../../link-understanding/apply.js", () => ({
applyLinkUnderstanding: mocks.applyLinkUnderstanding,
}));
vi.mock("../../media-understanding/apply.js", () => ({
applyMediaUnderstanding: mocks.applyMediaUnderstanding,
}));
vi.mock("../../runtime.js", () => ({
defaultRuntime: { log: vi.fn() },
}));
vi.mock("../command-auth.js", () => ({
resolveCommandAuthorization: vi.fn(() => ({ isAuthorizedSender: true })),
}));
vi.mock("./commands-core.js", () => ({
emitResetCommandHooks: vi.fn(async () => undefined),
}));
vi.mock("./directive-handling.js", () => ({
resolveDefaultModel: vi.fn(() => ({
defaultProvider: "openai",
defaultModel: "gpt-4o-mini",
aliasIndex: new Map(),
})),
}));
vi.mock("./get-reply-directives.js", () => ({
resolveReplyDirectives: mocks.resolveReplyDirectives,
}));
vi.mock("./get-reply-inline-actions.js", () => ({
handleInlineActions: vi.fn(async () => ({ kind: "reply", reply: { text: "ok" } })),
}));
vi.mock("./get-reply-run.js", () => ({
runPreparedReply: vi.fn(async () => undefined),
}));
vi.mock("./inbound-context.js", () => ({
finalizeInboundContext: vi.fn((ctx: unknown) => ctx),
}));
vi.mock("./session-reset-model.js", () => ({
applyResetModelOverride: vi.fn(async () => undefined),
}));
vi.mock("./session.js", () => ({
initSessionState: mocks.initSessionState,
}));
vi.mock("./stage-sandbox-media.js", () => ({
stageSandboxMedia: vi.fn(async () => undefined),
}));
vi.mock("./typing.js", () => ({
createTypingController: vi.fn(() => ({
onReplyStart: async () => undefined,
startTypingLoop: async () => undefined,
startTypingOnText: async () => undefined,
refreshTypingTtl: () => undefined,
isActive: () => false,
markRunComplete: () => undefined,
markDispatchIdle: () => undefined,
cleanup: () => undefined,
})),
}));
const { getReplyFromConfig } = await import("./get-reply.js");
function buildCtx(overrides: Partial<MsgContext> = {}): MsgContext {
return {
Provider: "telegram",
Surface: "telegram",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-100123",
ChatType: "group",
Body: "<media:audio>",
BodyForAgent: "<media:audio>",
RawBody: "<media:audio>",
CommandBody: "<media:audio>",
SessionKey: "agent:main:telegram:-100123",
From: "telegram:user:42",
To: "telegram:-100123",
GroupChannel: "ops",
Timestamp: 1710000000000,
...overrides,
};
}
describe("getReplyFromConfig message hooks", () => {
beforeEach(() => {
delete process.env.OPENCLAW_TEST_FAST;
mocks.applyMediaUnderstanding.mockReset();
mocks.applyLinkUnderstanding.mockReset();
mocks.createInternalHookEvent.mockReset();
mocks.triggerInternalHook.mockReset();
mocks.resolveReplyDirectives.mockReset();
mocks.initSessionState.mockReset();
mocks.applyMediaUnderstanding.mockImplementation(async (...args: unknown[]) => {
const { ctx } = args[0] as { ctx: MsgContext };
ctx.Transcript = "voice transcript";
ctx.Body = "[Audio]\nTranscript:\nvoice transcript";
ctx.BodyForAgent = "[Audio]\nTranscript:\nvoice transcript";
});
mocks.applyLinkUnderstanding.mockResolvedValue(undefined);
mocks.createInternalHookEvent.mockImplementation(
(type: string, action: string, sessionKey: string, context: Record<string, unknown>) => ({
type,
action,
sessionKey,
context,
timestamp: new Date(),
messages: [],
}),
);
mocks.triggerInternalHook.mockResolvedValue(undefined);
mocks.resolveReplyDirectives.mockResolvedValue({ kind: "reply", reply: { text: "ok" } });
mocks.initSessionState.mockResolvedValue({
sessionCtx: {},
sessionEntry: {},
previousSessionEntry: {},
sessionStore: {},
sessionKey: "agent:main:telegram:-100123",
sessionId: "session-1",
isNewSession: false,
resetTriggered: false,
systemSent: false,
abortedLastRun: false,
storePath: "/tmp/sessions.json",
sessionScope: "per-chat",
groupResolution: undefined,
isGroup: true,
triggerBodyNormalized: "",
bodyStripped: "",
});
});
it("emits transcribed + preprocessed hooks with enriched context", async () => {
const ctx = buildCtx();
await getReplyFromConfig(ctx, undefined, {});
expect(mocks.createInternalHookEvent).toHaveBeenCalledTimes(2);
expect(mocks.createInternalHookEvent).toHaveBeenNthCalledWith(
1,
"message",
"transcribed",
"agent:main:telegram:-100123",
expect.objectContaining({
transcript: "voice transcript",
channelId: "telegram",
conversationId: "telegram:-100123",
}),
);
expect(mocks.createInternalHookEvent).toHaveBeenNthCalledWith(
2,
"message",
"preprocessed",
"agent:main:telegram:-100123",
expect.objectContaining({
transcript: "voice transcript",
isGroup: true,
groupId: "telegram:-100123",
}),
);
expect(mocks.triggerInternalHook).toHaveBeenCalledTimes(2);
});
it("emits only preprocessed when no transcript is produced", async () => {
mocks.applyMediaUnderstanding.mockImplementationOnce(async (...args: unknown[]) => {
const { ctx } = args[0] as { ctx: MsgContext };
ctx.Transcript = undefined;
ctx.Body = "<media:audio>";
ctx.BodyForAgent = "<media:audio>";
});
await getReplyFromConfig(buildCtx(), undefined, {});
expect(mocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
expect(mocks.createInternalHookEvent).toHaveBeenCalledWith(
"message",
"preprocessed",
"agent:main:telegram:-100123",
expect.any(Object),
);
});
it("skips message hooks in fast test mode", async () => {
process.env.OPENCLAW_TEST_FAST = "1";
await getReplyFromConfig(buildCtx(), undefined, {});
expect(mocks.applyMediaUnderstanding).not.toHaveBeenCalled();
expect(mocks.applyLinkUnderstanding).not.toHaveBeenCalled();
expect(mocks.createInternalHookEvent).not.toHaveBeenCalled();
expect(mocks.triggerInternalHook).not.toHaveBeenCalled();
});
it("skips message hooks when SessionKey is unavailable", async () => {
await getReplyFromConfig(buildCtx({ SessionKey: undefined }), undefined, {});
expect(mocks.createInternalHookEvent).not.toHaveBeenCalled();
expect(mocks.triggerInternalHook).not.toHaveBeenCalled();
});
});

View File

@@ -144,12 +144,16 @@ export async function getReplyFromConfig(
finalized.Provider ??
""
).toLowerCase();
const hookSessionKey = finalized.SessionKey?.trim();
const conversationId = finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined;
const isGroupConversation = Boolean(finalized.GroupSubject || finalized.GroupChannel);
const groupId = isGroupConversation ? conversationId : undefined;
// Trigger message:transcribed hook after media understanding completes
// Only fire if transcription actually occurred (skip in fast test mode or non-audio)
if (finalized.Transcript) {
if (!isFastTestEnv && hookSessionKey && finalized.Transcript) {
void triggerInternalHook(
createInternalHookEvent("message", "transcribed", finalized.SessionKey ?? "", {
createInternalHookEvent("message", "transcribed", hookSessionKey, {
from: finalized.From,
to: finalized.To,
body: finalized.Body,
@@ -157,7 +161,7 @@ export async function getReplyFromConfig(
transcript: finalized.Transcript,
timestamp: finalized.Timestamp,
channelId,
conversationId: finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined,
conversationId,
messageId: finalized.MessageSid,
senderId: finalized.SenderId,
senderName: finalized.SenderName,
@@ -176,34 +180,33 @@ export async function getReplyFromConfig(
// Trigger message:preprocessed hook after all media + link understanding.
// Fires for every message, giving hooks access to the fully enriched body
// (transcripts, image descriptions, link summaries) before the agent sees it.
void triggerInternalHook(
createInternalHookEvent("message", "preprocessed", finalized.SessionKey ?? "", {
from: finalized.From,
to: finalized.To,
body: finalized.Body,
bodyForAgent: finalized.BodyForAgent,
transcript: finalized.Transcript,
timestamp: finalized.Timestamp,
channelId,
conversationId: finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined,
messageId: finalized.MessageSid,
senderId: finalized.SenderId,
senderName: finalized.SenderName,
senderUsername: finalized.SenderUsername,
provider: finalized.Provider,
surface: finalized.Surface,
mediaPath: finalized.MediaPath,
mediaType: finalized.MediaType,
isGroup: Boolean(finalized.GroupSubject || finalized.GroupChannel),
groupId:
finalized.From?.includes(":group:") || finalized.From?.includes(":channel:")
? finalized.From
: undefined,
cfg,
}),
).catch((err) => {
logVerbose(`get-reply: message:preprocessed internal hook failed: ${String(err)}`);
});
if (!isFastTestEnv && hookSessionKey) {
void triggerInternalHook(
createInternalHookEvent("message", "preprocessed", hookSessionKey, {
from: finalized.From,
to: finalized.To,
body: finalized.Body,
bodyForAgent: finalized.BodyForAgent,
transcript: finalized.Transcript,
timestamp: finalized.Timestamp,
channelId,
conversationId,
messageId: finalized.MessageSid,
senderId: finalized.SenderId,
senderName: finalized.SenderName,
senderUsername: finalized.SenderUsername,
provider: finalized.Provider,
surface: finalized.Surface,
mediaPath: finalized.MediaPath,
mediaType: finalized.MediaType,
isGroup: isGroupConversation,
groupId,
cfg,
}),
).catch((err) => {
logVerbose(`get-reply: message:preprocessed internal hook failed: ${String(err)}`);
});
}
const commandAuthorized = finalized.CommandAuthorized;
resolveCommandAuthorization({

View File

@@ -383,6 +383,8 @@ describe("routeReply", () => {
channel: "slack",
to: "channel:C123",
sessionKey: "agent:main:main",
isGroup: true,
groupId: "channel:C123",
cfg: {} as never,
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
@@ -390,6 +392,8 @@ describe("routeReply", () => {
mirror: expect.objectContaining({
sessionKey: "agent:main:main",
text: "hi",
isGroup: true,
groupId: "channel:C123",
}),
}),
);

View File

@@ -278,7 +278,6 @@ describe("message hooks", () => {
describe("error isolation", () => {
it("should not propagate handler errors to caller", async () => {
const consoleError = vi.spyOn(console, "error").mockImplementation(() => {});
const badHandler = vi.fn(() => {
throw new Error("Hook exploded");
});
@@ -286,16 +285,10 @@ describe("message hooks", () => {
const event = createInternalHookEvent("message", "received", "s1", { content: "test" });
await expect(triggerInternalHook(event)).resolves.not.toThrow();
expect(consoleError).toHaveBeenCalledWith(
expect.stringContaining("Hook error"),
expect.stringContaining("Hook exploded"),
);
consoleError.mockRestore();
expect(badHandler).toHaveBeenCalledOnce();
});
it("should continue running subsequent handlers after one fails", async () => {
const consoleError = vi.spyOn(console, "error").mockImplementation(() => {});
const failHandler = vi.fn(() => {
throw new Error("First handler fails");
});
@@ -310,11 +303,9 @@ describe("message hooks", () => {
expect(failHandler).toHaveBeenCalled();
expect(successHandler).toHaveBeenCalled();
consoleError.mockRestore();
});
it("should isolate async handler errors", async () => {
const consoleError = vi.spyOn(console, "error").mockImplementation(() => {});
const asyncFailHandler = vi.fn(async () => {
throw new Error("Async hook failed");
});
@@ -323,8 +314,7 @@ describe("message hooks", () => {
await expect(
triggerInternalHook(createInternalHookEvent("message", "sent", "s1", { content: "reply" })),
).resolves.not.toThrow();
consoleError.mockRestore();
expect(asyncFailHandler).toHaveBeenCalledOnce();
});
});

View File

@@ -627,6 +627,8 @@ describe("deliverOutboundPayloads", () => {
const { sendWhatsApp } = await runChunkedWhatsAppDelivery({
mirror: {
sessionKey: "agent:main:main",
isGroup: true,
groupId: "whatsapp:group:123",
},
});
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
@@ -643,6 +645,8 @@ describe("deliverOutboundPayloads", () => {
channelId: "whatsapp",
conversationId: "+1555",
messageId: "w2",
isGroup: true,
groupId: "whatsapp:group:123",
}),
);
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);