欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

用Netty手写一个RPC远程过程调用框架

程序员文章站 2022-03-26 17:05:19
在上篇初步介绍Netty的一些概念和执行流程后,接下来废话不多说,直接撸代码,用Netty写一个RPC框架,用Netty来实现远程过程调用。1.这里先创建三个工程Maven工程:顾名思义,每个工程的作用显而易见。......

 接下来废话不多说,直接撸代码。先用Netty写一个RPC框架,实现远程过程调用,来推开Netty实用场景的大门。后续还会接上用Netty来实现聊天室、心跳机制以及用Netty手写一个雏形的Dubbo框架。
**

手写RPC框架

**
RPC是远程过程调用的简写,是一个协议,处于网络通信协议的第五层:会话层,其下就是TCP/IP协议,建立在其基础上的通信会话协议。RPC定义了交互的模式,而应用程序使用这些模式,来访问其他服务器的方法,并不需要关系具体的网络上的细节。 RPC采用C/S模式。请求程序即客户机,而服务提供程序就是一个服务器。首先,客户端调用进程发送一个带有请求参数的调用信息到服务端,然后等待应答信息。在服务端,进程一直保持睡眠状态直到接收到调用信息为止。当一个调用信息到达,服务器根据请求参数进行计算,并将结果发送给发出请求的客户端,然后等待下一个调用信息。客户端调用进程接收到答复信息,获得调用结果,然后继续发出下一次调用。

废话不多说,就是干。
RPC框架说明:
I.用户将业务接口通知到Server与Client,这里要注意的是***业务接口***也就是***服务名称***。
II. 用户只需将业务接口的实现类写入到Server端的指定包下,并且在server端对该包下的所有实现类进行发布。
III. Client端只需根据服务名(业务接口名)就可获取到Server端发布的服务提供者(动态代理),然后进行远程调用,获得server端实现类执行的结果。
需要的项目工程如下:
用Netty手写一个RPC远程过程调用框架

1.创建API工程,定义服务接口
(1)创建一个普通的maven工程,导入依赖,这里仅需lombok,不用lombok也行,随性。

 ```xml
<dependencies>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
            <scope>provided</scope>
        </dependency>

    </dependencies> 
(2) 定义业务接口和常量类

```java
/**
 *业务接口,很随性地搞点事情
 */
public interface SomeService {
	String doSome(String some);
}
/**
 *客户端发送给服务端的服务调用信息
 */
@Data
public class InvokeMessage implements Serializable {
	private static final long serialVersionUID = 1L;
	//服务名称(接口名称)
    private String serviceName;
    //方法名称
    private String methodName;
    //方法参数类型列表
    private Class<?>[] paramTypes;
    //方法参数值列表
    private Object[] paramValues;
 } 

2.创建server端
(1)创建一个maven工程,依赖:

<dependencies> <!-- netty-all依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency> <!--API工程依赖 --> <dependency> <groupId>com.netty</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!--lombok依赖 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <scope>provided</scope> </dependency> </dependencies> 

(2)定义接口实现类,提供业务产出

/**
 * 服务提供者
 *
 */ public class SomeServiceImpl implements SomeService { public String doSome(String anything) { return "欢迎来搞事,就是干~ "+anything; } } 

(3)定义服务器类即server,主要实现服务的注册、发布

public class RpcServer { // 服务注册表:Map(指定包下的实现类的实例存放到这个集合中) private Map<String, Object> registryMap = new HashMap<>(); // 创建List(指定包中的class加载到这个集合中) private List<String> classCache = new ArrayList<>(); private static int port=8888; // 将指定包下的所有业务接口实现类进行发布 public void publish(String providerPackage) throws Exception { // 扫描指定包下的所有实现类 getProviderClass(providerPackage); // 提供者注册 doRegister(); NioEventLoopGroup parentGroup = new NioEventLoopGroup(); NioEventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast(new RpcServerHandler(registryMap)); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("微服务已注册成功,port:"+port); future.channel().closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } // 扫描指定包下的所有实现类:要将指定包下的class添加到一个集合 // providerPackage的形式如:com.abc.service public void getProviderClass(String providerPackage) { // 获取指定包的资源对象 URL resource = this.getClass().getClassLoader() // 获取到类加载器 // 将包中的点(.)替换为路径中的/ .getResource(providerPackage.replaceAll("\\.", "/")); // 获取路径文件 File dir = new File(resource.getFile()); for(File file : dir.listFiles()) { // 若当前遍历的是个目录,则递归调用 if(file.isDirectory()) { getProviderClass(providerPackage + "." + file.getName()); // 若当前遍历的是一个.class文件,则将文件名中的.class扩展名去掉 } else if(file.getName().endsWith(".class")){ // 获取到文件名,即类的简单类名 String fileName = file.getName().replace(".class", "").trim(); // 将实现类的全限定性类名添加到classCache中 classCache.add(providerPackage + "." + fileName); } } System.out.println("classCache:"+classCache); } // 提供者注册:将指定包中的实现类实例化后存放到一个map中 // key为实现类的接口名,value为实现类实例 private void doRegister() throws Exception { // 若指定包中没有类,则无需注册 if(classCache.size() == 0) return; // 遍历缓存集合中的所有类,创建相应的实例,存放到map中 for (String className : classCache) { // 加载当前遍历的类 Class<?> clazz = Class.forName(className); // 获取当前遍历类所实现的接口名称 String interfaceName = clazz.getInterfaces()[0].getName(); System.out.println("interfaceName:"+interfaceName); // 将接口名作为key,当前遍历类的实例作为value,写入到map中 registryMap.put(interfaceName, clazz.newInstance()); } } } 

(4)定义处理器handler

// 服务端处理器: // 1 接收客户端提交的调用消息,并根据调用消息调用本地的服务 // 2 将本地服务运行结果返回给客户端 public class RpcServerHandler extends ChannelInboundHandlerAdapter { private Map<String, Object> registryMap; // 接收服务注册表 public RpcServerHandler(Map<String, Object> registryMap) { this.registryMap = registryMap; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof InvokeMessage) { Object result = "没有指定的提供者"; InvokeMessage message = (InvokeMessage) msg; System.out.println("收到执行请求信息:"+message); // 判断注册表中是否存在客户端要调用的服务 if(registryMap.containsKey(message.getServiceName())) { // 获取到指定名称的服务提供者实例 Object provider = registryMap.get(message.getServiceName()); result = provider.getClass() .getMethod(message.getMethodName(), message.getParamTypes()) .invoke(provider, message.getParamValues()); System.out.println("执行结果:"+result); } ctx.writeAndFlush(result); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

(5)服务启动类

public class RpCServerStarter { public static void main(String[] args) throws Exception { new RpcServer().publish("com.netty.rpc.service"); System.in.read(); } } 

运行启动:
用Netty手写一个RPC远程过程调用框架
3.创建客户端client
(1)创建maven工程rpc-client,并导入依赖:

 <dependencies> <!-- netty-all依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency> <!--API工程依赖--> <dependency> <groupId>com.netty</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!--lombok依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <scope>provided</scope> </dependency> </dependencies> 

(2)在客户端创建一个动态代理类,用于动态代理服务端的提供者对象,连接上server将调用信息发送给server端,进行远程调用:

public class RpcProxy { // 泛型方法 public static <T> T create(final Class<?> clazz) { return (T)Proxy.newProxyInstance( clazz.getClassLoader(), // 类加载器 new Class[]{clazz}, // 代理的接口 new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 若代理对象调用的是Object的方法,则直接进行本地调用 if(Object.class.equals(method.getDeclaringClass())) { return method.invoke(this, args); } // 若代理对象调用的是业务接口中的方法,则进行远程调用 return rpcInvoke(clazz, method, args); } }); } // 进行远程调用 private static Object rpcInvoke(Class<?> clazz, Method method, Object[] args) throws InterruptedException { NioEventLoopGroup loopGroup = new NioEventLoopGroup(); final RpcClientHandler handler = new RpcClientHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(loopGroup) .channel(NioSocketChannel.class) // TCP默认使用了一个称为Nagle的算法,该算法可以通过发送尽量大的数据块来充分利用网络带宽, // 若该属性设置为true,则TCP不进行发送延迟,有数据马上就发送,不进行大数据块的积攒 // 若属性设置为false,则当要发送的数据积攒到一定大小后才会发送 .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast(handler); } }); ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); // 创建并初始化调用信息对象 InvokeMessage message = new InvokeMessage(); message.setServiceName(clazz.getName()); message.setMethodName(method.getName()); message.setParamTypes(method.getParameterTypes()); message.setParamValues(args); // 一旦连接上Server,马上就将调用信息发送给Server端 future.channel().writeAndFlush(message); future.channel().closeFuture().sync(); } finally { loopGroup.shutdownGracefully(); } Object reObj=handler.getResult(); System.out.println("reObj="+reObj); return reObj; } } 

(3)定义客户端处理器,接受调用server端后server端传过来的结果:

// 客户端处理器 // 接收Server传来的远程调用结果 public class RpcClientHandler extends SimpleChannelInboundHandler { private Object result; public Object getResult() { return this.result; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // msg即为服务端传递来的远程调用结果 this.result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

(4)编写客户端测试类,远程调用:

public class SomeConsumer { public static void main(String[] args) { SomeService service = RpcProxy.create(SomeService.class); String result = service.doSome("拒绝PUA"); System.out.println(result); } } 

用Netty手写一个RPC远程过程调用框架
以上,就算是一个简单RPC框架了,实现了远程过程调用。欢迎一起来交流!

本文地址:https://blog.csdn.net/qq_38145483/article/details/107886888