Files
openclaw/src/auto-reply/reply.block-streaming.test.ts
2026-03-24 08:37:00 +00:00

251 lines
7.8 KiB
TypeScript

import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import type { MsgContext } from "./templating.js";
const mocks = vi.hoisted(() => ({
resolveReplyDirectives: vi.fn(),
handleInlineActions: vi.fn(),
initSessionState: vi.fn(),
runPreparedReply: vi.fn(),
}));
vi.mock("../agents/agent-scope.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../agents/agent-scope.js")>();
return {
...actual,
resolveAgentDir: vi.fn(() => "/tmp/agent"),
resolveAgentWorkspaceDir: vi.fn(() => "/tmp/workspace"),
resolveSessionAgentId: vi.fn(() => "main"),
resolveAgentSkillsFilter: vi.fn(() => undefined),
};
});
vi.mock("../agents/model-selection.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../agents/model-selection.js")>();
return {
...actual,
resolveModelRefFromString: vi.fn(() => null),
};
});
vi.mock("../agents/timeout.js", () => ({
resolveAgentTimeoutMs: vi.fn(() => 60_000),
}));
vi.mock("../agents/workspace.js", () => ({
DEFAULT_AGENT_WORKSPACE_DIR: "/tmp/workspace",
ensureAgentWorkspace: vi.fn(async () => ({ dir: "/tmp/workspace" })),
}));
vi.mock("../channels/model-overrides.js", () => ({
resolveChannelModelOverride: vi.fn(() => undefined),
}));
vi.mock("../config/config.js", () => ({
loadConfig: vi.fn(() => ({})),
}));
vi.mock("../runtime.js", () => ({
defaultRuntime: { log: vi.fn(), error: vi.fn(), warn: vi.fn(), info: vi.fn() },
}));
vi.mock("./command-auth.js", () => ({
resolveCommandAuthorization: vi.fn(() => ({ isAuthorizedSender: true })),
}));
vi.mock("./reply/directive-handling.defaults.js", () => ({
resolveDefaultModel: vi.fn(() => ({
defaultProvider: "anthropic",
defaultModel: "claude-opus-4-5",
aliasIndex: new Map(),
})),
}));
vi.mock("./reply/inbound-context.js", () => ({
finalizeInboundContext: vi.fn((ctx: unknown) => ctx),
}));
vi.mock("./reply/session-reset-model.js", () => ({
applyResetModelOverride: vi.fn(async () => undefined),
}));
vi.mock("./reply/stage-sandbox-media.js", () => ({
stageSandboxMedia: vi.fn(async () => undefined),
}));
vi.mock("./reply/typing.js", () => ({
createTypingController: vi.fn(() => ({
onReplyStart: async () => undefined,
startTypingLoop: async () => undefined,
startTypingOnText: async () => undefined,
refreshTypingTtl: () => undefined,
isActive: () => false,
markRunComplete: () => undefined,
markDispatchIdle: () => undefined,
cleanup: () => undefined,
})),
}));
vi.mock("./reply/get-reply-directives.js", () => ({
resolveReplyDirectives: (...args: unknown[]) => mocks.resolveReplyDirectives(...args),
}));
vi.mock("./reply/get-reply-inline-actions.js", () => ({
handleInlineActions: (...args: unknown[]) => mocks.handleInlineActions(...args),
}));
vi.mock("./reply/session.js", () => ({
initSessionState: (...args: unknown[]) => mocks.initSessionState(...args),
}));
vi.mock("./reply/get-reply-run.js", () => ({
runPreparedReply: (...args: unknown[]) => mocks.runPreparedReply(...args),
}));
let getReplyFromConfig: typeof import("./reply/get-reply.js").getReplyFromConfig;
async function loadFreshGetReplyModuleForTest() {
vi.resetModules();
({ getReplyFromConfig } = await import("./reply/get-reply.js"));
}
function createTelegramMessage(messageSid: string): MsgContext {
return {
Body: "ping",
From: "+1004",
To: "+2000",
MessageSid: messageSid,
Provider: "telegram",
Surface: "telegram",
ChatType: "direct",
};
}
function createReplyConfig(streamMode?: "block"): OpenClawConfig {
return {
agents: {
defaults: {
model: { primary: "anthropic/claude-opus-4-5" },
workspace: "/tmp/workspace",
},
},
channels: { telegram: { allowFrom: ["*"], streamMode } },
session: { store: "/tmp/sessions.json" },
};
}
function createContinueDirectivesResult() {
return {
kind: "continue" as const,
result: {
commandSource: undefined,
command: {
surface: "telegram",
channel: "telegram",
channelId: "+2000",
ownerList: [],
senderIsOwner: true,
isAuthorizedSender: true,
senderId: "+1004",
abortKey: "telegram:+2000",
rawBodyNormalized: "ping",
commandBodyNormalized: "ping",
from: "+1004",
to: "+2000",
resetHookTriggered: false,
},
allowTextCommands: true,
skillCommands: [],
directives: {},
cleanedBody: "ping",
elevatedEnabled: false,
elevatedAllowed: false,
elevatedFailures: [],
defaultActivation: "always",
resolvedThinkLevel: undefined,
resolvedVerboseLevel: "off",
resolvedReasoningLevel: "off",
resolvedElevatedLevel: "off",
execOverrides: undefined,
blockStreamingEnabled: true,
blockReplyChunking: undefined,
resolvedBlockStreamingBreak: "message_end",
provider: "anthropic",
model: "claude-opus-4-5",
modelState: {
resolveDefaultThinkingLevel: async () => undefined,
},
contextTokens: 0,
inlineStatusRequested: false,
directiveAck: undefined,
perMessageQueueMode: undefined,
perMessageQueueOptions: undefined,
},
};
}
describe("block streaming", () => {
beforeEach(async () => {
await loadFreshGetReplyModuleForTest();
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
mocks.resolveReplyDirectives.mockReset();
mocks.handleInlineActions.mockReset();
mocks.initSessionState.mockReset();
mocks.runPreparedReply.mockReset();
mocks.resolveReplyDirectives.mockResolvedValue(createContinueDirectivesResult());
mocks.handleInlineActions.mockImplementation(async (params) => ({
kind: "continue",
directives: params.directives,
abortedLastRun: false,
}));
mocks.initSessionState.mockImplementation(async ({ ctx }: { ctx: MsgContext }) => ({
sessionCtx: {
...ctx,
CommandAuthorized: true,
},
sessionEntry: {},
previousSessionEntry: {},
sessionStore: {},
sessionKey: "agent:main:telegram:direct:+1004",
sessionId: "session-1",
isNewSession: true,
resetTriggered: false,
systemSent: false,
abortedLastRun: false,
storePath: "/tmp/sessions.json",
sessionScope: "per-sender",
groupResolution: undefined,
isGroup: false,
triggerBodyNormalized: "ping",
bodyStripped: "ping",
}));
});
it("handles ordering, timeout fallback, and telegram streamMode block", async () => {
const onReplyStart = vi.fn().mockResolvedValue(undefined);
const onBlockReply = vi.fn().mockResolvedValue(undefined);
mocks.runPreparedReply.mockImplementationOnce(async (params) => {
await params.opts?.onReplyStart?.();
await params.opts?.onBlockReply?.({ text: "first\n\nsecond" });
return undefined;
});
const res = await getReplyFromConfig(
createTelegramMessage("msg-123"),
{
onReplyStart,
onBlockReply,
disableBlockStreaming: false,
},
createReplyConfig(),
);
expect(res).toBeUndefined();
expect(mocks.runPreparedReply).toHaveBeenCalledTimes(1);
expect(onReplyStart).toHaveBeenCalledTimes(1);
expect(onBlockReply).toHaveBeenCalledWith({ text: "first\n\nsecond" });
const onBlockReplyStreamMode = vi.fn().mockResolvedValue(undefined);
mocks.runPreparedReply.mockImplementationOnce(async () => [{ text: "final" }]);
const resStreamMode = await getReplyFromConfig(
createTelegramMessage("msg-127"),
{
onBlockReply: onBlockReplyStreamMode,
},
createReplyConfig("block"),
);
const streamPayload = Array.isArray(resStreamMode) ? resStreamMode[0] : resStreamMode;
expect(streamPayload?.text).toBe("final");
expect(onBlockReplyStreamMode).not.toHaveBeenCalled();
});
});