feat(plugins): add before_agent_reply hook (claiming pattern) (#20067)

Merged via squash.

Prepared head SHA: e40dfbdfb9
Co-authored-by: JoshuaLelon <23615754+JoshuaLelon@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Joshua Lelon Mitchell
2026-04-01 15:31:11 -05:00
committed by GitHub
parent 017bc5261c
commit 7cb323d84f
8 changed files with 390 additions and 2 deletions

View File

@@ -0,0 +1,181 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { HookRunner } from "../../plugins/hooks.js";
import type { MsgContext } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import "./get-reply.test-runtime-mocks.js";
const mocks = vi.hoisted(() => ({
resolveReplyDirectives: vi.fn(),
handleInlineActions: vi.fn(),
initSessionState: vi.fn(),
hasHooks: vi.fn<HookRunner["hasHooks"]>(),
runBeforeAgentReply: vi.fn<HookRunner["runBeforeAgentReply"]>(),
}));
vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () =>
({
hasHooks: mocks.hasHooks,
runBeforeAgentReply: mocks.runBeforeAgentReply,
}) as unknown as HookRunner,
}));
vi.mock("./get-reply-directives.js", () => ({
resolveReplyDirectives: (...args: unknown[]) => mocks.resolveReplyDirectives(...args),
}));
vi.mock("./get-reply-inline-actions.js", () => ({
handleInlineActions: (...args: unknown[]) => mocks.handleInlineActions(...args),
}));
vi.mock("./session.js", () => ({
initSessionState: (...args: unknown[]) => mocks.initSessionState(...args),
}));
let getReplyFromConfig: typeof import("./get-reply.js").getReplyFromConfig;
async function loadFreshGetReplyModuleForTest() {
vi.resetModules();
({ getReplyFromConfig } = await import("./get-reply.js"));
}
function buildCtx(overrides: Partial<MsgContext> = {}): MsgContext {
return {
Provider: "telegram",
Surface: "telegram",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-100123",
ChatType: "group",
Body: "hello world",
BodyForAgent: "hello world",
RawBody: "hello world",
CommandBody: "hello world",
BodyForCommands: "hello world",
SessionKey: "agent:main:telegram:-100123",
From: "telegram:user:42",
To: "telegram:-100123",
Timestamp: 1710000000000,
...overrides,
};
}
function createContinueDirectivesResult() {
return {
kind: "continue" as const,
result: {
commandSource: "text",
command: {
surface: "telegram",
channel: "telegram",
channelId: "telegram",
ownerList: [],
senderIsOwner: false,
isAuthorizedSender: true,
senderId: "42",
abortKey: "agent:main:telegram:-100123",
rawBodyNormalized: "hello world",
commandBodyNormalized: "hello world",
from: "telegram:user:42",
to: "telegram:-100123",
resetHookTriggered: false,
},
allowTextCommands: true,
skillCommands: [],
directives: {},
cleanedBody: "hello world",
elevatedEnabled: false,
elevatedAllowed: false,
elevatedFailures: [],
defaultActivation: "always",
resolvedThinkLevel: undefined,
resolvedVerboseLevel: "off",
resolvedReasoningLevel: "off",
resolvedElevatedLevel: "off",
execOverrides: undefined,
blockStreamingEnabled: false,
blockReplyChunking: undefined,
resolvedBlockStreamingBreak: undefined,
provider: "openai",
model: "gpt-4o-mini",
modelState: {
resolveDefaultThinkingLevel: async () => undefined,
},
contextTokens: 0,
inlineStatusRequested: false,
directiveAck: undefined,
perMessageQueueMode: undefined,
perMessageQueueOptions: undefined,
},
};
}
describe("getReplyFromConfig before_agent_reply wiring", () => {
beforeEach(async () => {
await loadFreshGetReplyModuleForTest();
mocks.resolveReplyDirectives.mockReset();
mocks.handleInlineActions.mockReset();
mocks.initSessionState.mockReset();
mocks.hasHooks.mockReset();
mocks.runBeforeAgentReply.mockReset();
mocks.initSessionState.mockResolvedValue({
sessionCtx: buildCtx({
OriginatingChannel: "Telegram",
Provider: "telegram",
}),
sessionEntry: {},
previousSessionEntry: {},
sessionStore: {},
sessionKey: "agent:main:telegram:-100123",
sessionId: "session-1",
isNewSession: false,
resetTriggered: false,
systemSent: false,
abortedLastRun: false,
storePath: "/tmp/sessions.json",
sessionScope: "per-chat",
groupResolution: undefined,
isGroup: true,
triggerBodyNormalized: "hello world",
bodyStripped: "hello world",
});
mocks.resolveReplyDirectives.mockResolvedValue(createContinueDirectivesResult());
mocks.handleInlineActions.mockResolvedValue({
kind: "continue",
directives: {},
abortedLastRun: false,
});
mocks.hasHooks.mockImplementation((hookName) => hookName === "before_agent_reply");
});
it("returns a plugin reply and invokes the hook after inline actions", async () => {
mocks.runBeforeAgentReply.mockResolvedValue({
handled: true,
reply: { text: "plugin reply" },
});
const result = await getReplyFromConfig(buildCtx(), undefined, {});
expect(result).toEqual({ text: "plugin reply" });
expect(mocks.runBeforeAgentReply).toHaveBeenCalledWith(
{ cleanedBody: "hello world" },
expect.objectContaining({
agentId: "main",
sessionKey: "agent:main:telegram:-100123",
sessionId: "session-1",
workspaceDir: "/tmp/workspace",
messageProvider: "telegram",
trigger: "user",
channelId: "telegram",
}),
);
expect(mocks.handleInlineActions.mock.invocationCallOrder[0]).toBeLessThan(
mocks.runBeforeAgentReply.mock.invocationCallOrder[0] ?? 0,
);
});
it("falls back to NO_REPLY when the hook claims without a reply payload", async () => {
mocks.runBeforeAgentReply.mockResolvedValue({ handled: true });
const result = await getReplyFromConfig(buildCtx(), undefined, {});
expect(result).toEqual({ text: SILENT_REPLY_TOKEN });
});
});

View File

@@ -44,6 +44,20 @@ function loadStageSandboxMediaRuntime() {
return stageSandboxMediaRuntimePromise;
}
let hookRunnerGlobalPromise: Promise<typeof import("../../plugins/hook-runner-global.js")> | null =
null;
let originRoutingPromise: Promise<typeof import("./origin-routing.js")> | null = null;
function loadHookRunnerGlobal() {
hookRunnerGlobalPromise ??= import("../../plugins/hook-runner-global.js");
return hookRunnerGlobalPromise;
}
function loadOriginRouting() {
originRoutingPromise ??= import("./origin-routing.js");
return originRoutingPromise;
}
function mergeSkillFilters(channelFilter?: string[], agentFilter?: string[]): string[] | undefined {
const normalize = (list?: string[]) => {
if (!Array.isArray(list)) {
@@ -419,6 +433,32 @@ export async function getReplyFromConfig(
directives = inlineActionResult.directives;
abortedLastRun = inlineActionResult.abortedLastRun ?? abortedLastRun;
// Allow plugins to intercept and return a synthetic reply before the LLM runs.
const { getGlobalHookRunner } = await loadHookRunnerGlobal();
const hookRunner = getGlobalHookRunner();
if (hookRunner?.hasHooks("before_agent_reply")) {
const { resolveOriginMessageProvider } = await loadOriginRouting();
const hookMessageProvider = resolveOriginMessageProvider({
originatingChannel: sessionCtx.OriginatingChannel,
provider: sessionCtx.Provider,
});
const hookResult = await hookRunner.runBeforeAgentReply(
{ cleanedBody },
{
agentId,
sessionKey: agentSessionKey,
sessionId,
workspaceDir,
messageProvider: hookMessageProvider,
trigger: opts?.isHeartbeat ? "heartbeat" : "user",
channelId: hookMessageProvider,
},
);
if (hookResult?.handled) {
return hookResult.reply ?? { text: SILENT_REPLY_TOKEN };
}
}
if (sessionKey && hasInboundMedia(ctx)) {
const { stageSandboxMedia } = await loadStageSandboxMediaRuntime();
await stageSandboxMedia({