用Netty手写一个RPC远程过程调用框架
接下来废话不多说,直接撸代码。先用Netty写一个RPC框架,实现远程过程调用,来推开Netty实用场景的大门。后续还会接上用Netty来实现聊天室、心跳机制以及用Netty手写一个雏形的Dubbo框架。
**
手写RPC框架
**
RPC是远程过程调用的简写,是一个协议,处于网络通信协议的第五层:会话层,其下就是TCP/IP协议,建立在其基础上的通信会话协议。RPC定义了交互的模式,而应用程序使用这些模式,来访问其他服务器的方法,并不需要关系具体的网络上的细节。 RPC采用C/S模式。请求程序即客户机,而服务提供程序就是一个服务器。首先,客户端调用进程发送一个带有请求参数的调用信息到服务端,然后等待应答信息。在服务端,进程一直保持睡眠状态直到接收到调用信息为止。当一个调用信息到达,服务器根据请求参数进行计算,并将结果发送给发出请求的客户端,然后等待下一个调用信息。客户端调用进程接收到答复信息,获得调用结果,然后继续发出下一次调用。
废话不多说,就是干。
RPC框架说明:
I.用户将业务接口通知到Server与Client,这里要注意的是***业务接口***也就是***服务名称***。
II. 用户只需将业务接口的实现类写入到Server端的指定包下,并且在server端对该包下的所有实现类进行发布。
III. Client端只需根据服务名(业务接口名)就可获取到Server端发布的服务提供者(动态代理),然后进行远程调用,获得server端实现类执行的结果。
需要的项目工程如下:
1.创建API工程,定义服务接口
(1)创建一个普通的maven工程,导入依赖,这里仅需lombok,不用lombok也行,随性。
```xml
<dependencies>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
(2) 定义业务接口和常量类
```java
/**
*业务接口,很随性地搞点事情
*/
public interface SomeService {
String doSome(String some);
}
/**
*客户端发送给服务端的服务调用信息
*/
@Data
public class InvokeMessage implements Serializable {
private static final long serialVersionUID = 1L;
//服务名称(接口名称)
private String serviceName;
//方法名称
private String methodName;
//方法参数类型列表
private Class<?>[] paramTypes;
//方法参数值列表
private Object[] paramValues;
}
2.创建server端
(1)创建一个maven工程,依赖:
<dependencies> <!-- netty-all依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency> <!--API工程依赖 --> <dependency> <groupId>com.netty</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!--lombok依赖 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <scope>provided</scope> </dependency> </dependencies>
(2)定义接口实现类,提供业务产出
/**
* 服务提供者
*
*/ public class SomeServiceImpl implements SomeService { public String doSome(String anything) { return "欢迎来搞事,就是干~ "+anything; } }
(3)定义服务器类即server,主要实现服务的注册、发布
public class RpcServer { // 服务注册表:Map(指定包下的实现类的实例存放到这个集合中) private Map<String, Object> registryMap = new HashMap<>(); // 创建List(指定包中的class加载到这个集合中) private List<String> classCache = new ArrayList<>(); private static int port=8888; // 将指定包下的所有业务接口实现类进行发布 public void publish(String providerPackage) throws Exception { // 扫描指定包下的所有实现类 getProviderClass(providerPackage); // 提供者注册 doRegister(); NioEventLoopGroup parentGroup = new NioEventLoopGroup(); NioEventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast(new RpcServerHandler(registryMap)); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("微服务已注册成功,port:"+port); future.channel().closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } // 扫描指定包下的所有实现类:要将指定包下的class添加到一个集合 // providerPackage的形式如:com.abc.service public void getProviderClass(String providerPackage) { // 获取指定包的资源对象 URL resource = this.getClass().getClassLoader() // 获取到类加载器 // 将包中的点(.)替换为路径中的/ .getResource(providerPackage.replaceAll("\\.", "/")); // 获取路径文件 File dir = new File(resource.getFile()); for(File file : dir.listFiles()) { // 若当前遍历的是个目录,则递归调用 if(file.isDirectory()) { getProviderClass(providerPackage + "." + file.getName()); // 若当前遍历的是一个.class文件,则将文件名中的.class扩展名去掉 } else if(file.getName().endsWith(".class")){ // 获取到文件名,即类的简单类名 String fileName = file.getName().replace(".class", "").trim(); // 将实现类的全限定性类名添加到classCache中 classCache.add(providerPackage + "." + fileName); } } System.out.println("classCache:"+classCache); } // 提供者注册:将指定包中的实现类实例化后存放到一个map中 // key为实现类的接口名,value为实现类实例 private void doRegister() throws Exception { // 若指定包中没有类,则无需注册 if(classCache.size() == 0) return; // 遍历缓存集合中的所有类,创建相应的实例,存放到map中 for (String className : classCache) { // 加载当前遍历的类 Class<?> clazz = Class.forName(className); // 获取当前遍历类所实现的接口名称 String interfaceName = clazz.getInterfaces()[0].getName(); System.out.println("interfaceName:"+interfaceName); // 将接口名作为key,当前遍历类的实例作为value,写入到map中 registryMap.put(interfaceName, clazz.newInstance()); } } }
(4)定义处理器handler
// 服务端处理器: // 1 接收客户端提交的调用消息,并根据调用消息调用本地的服务 // 2 将本地服务运行结果返回给客户端 public class RpcServerHandler extends ChannelInboundHandlerAdapter { private Map<String, Object> registryMap; // 接收服务注册表 public RpcServerHandler(Map<String, Object> registryMap) { this.registryMap = registryMap; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof InvokeMessage) { Object result = "没有指定的提供者"; InvokeMessage message = (InvokeMessage) msg; System.out.println("收到执行请求信息:"+message); // 判断注册表中是否存在客户端要调用的服务 if(registryMap.containsKey(message.getServiceName())) { // 获取到指定名称的服务提供者实例 Object provider = registryMap.get(message.getServiceName()); result = provider.getClass() .getMethod(message.getMethodName(), message.getParamTypes()) .invoke(provider, message.getParamValues()); System.out.println("执行结果:"+result); } ctx.writeAndFlush(result); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
(5)服务启动类
public class RpCServerStarter { public static void main(String[] args) throws Exception { new RpcServer().publish("com.netty.rpc.service"); System.in.read(); } }
运行启动:
3.创建客户端client
(1)创建maven工程rpc-client,并导入依赖:
<dependencies> <!-- netty-all依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency> <!--API工程依赖--> <dependency> <groupId>com.netty</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!--lombok依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <scope>provided</scope> </dependency> </dependencies>
(2)在客户端创建一个动态代理类,用于动态代理服务端的提供者对象,连接上server将调用信息发送给server端,进行远程调用:
public class RpcProxy { // 泛型方法 public static <T> T create(final Class<?> clazz) { return (T)Proxy.newProxyInstance( clazz.getClassLoader(), // 类加载器 new Class[]{clazz}, // 代理的接口 new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 若代理对象调用的是Object的方法,则直接进行本地调用 if(Object.class.equals(method.getDeclaringClass())) { return method.invoke(this, args); } // 若代理对象调用的是业务接口中的方法,则进行远程调用 return rpcInvoke(clazz, method, args); } }); } // 进行远程调用 private static Object rpcInvoke(Class<?> clazz, Method method, Object[] args) throws InterruptedException { NioEventLoopGroup loopGroup = new NioEventLoopGroup(); final RpcClientHandler handler = new RpcClientHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(loopGroup) .channel(NioSocketChannel.class) // TCP默认使用了一个称为Nagle的算法,该算法可以通过发送尽量大的数据块来充分利用网络带宽, // 若该属性设置为true,则TCP不进行发送延迟,有数据马上就发送,不进行大数据块的积攒 // 若属性设置为false,则当要发送的数据积攒到一定大小后才会发送 .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast(handler); } }); ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); // 创建并初始化调用信息对象 InvokeMessage message = new InvokeMessage(); message.setServiceName(clazz.getName()); message.setMethodName(method.getName()); message.setParamTypes(method.getParameterTypes()); message.setParamValues(args); // 一旦连接上Server,马上就将调用信息发送给Server端 future.channel().writeAndFlush(message); future.channel().closeFuture().sync(); } finally { loopGroup.shutdownGracefully(); } Object reObj=handler.getResult(); System.out.println("reObj="+reObj); return reObj; } }
(3)定义客户端处理器,接受调用server端后server端传过来的结果:
// 客户端处理器 // 接收Server传来的远程调用结果 public class RpcClientHandler extends SimpleChannelInboundHandler { private Object result; public Object getResult() { return this.result; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // msg即为服务端传递来的远程调用结果 this.result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
(4)编写客户端测试类,远程调用:
public class SomeConsumer { public static void main(String[] args) { SomeService service = RpcProxy.create(SomeService.class); String result = service.doSome("拒绝PUA"); System.out.println(result); } }
以上,就算是一个简单RPC框架了,实现了远程过程调用。欢迎一起来交流!
本文地址:https://blog.csdn.net/qq_38145483/article/details/107886888
上一篇: Java课堂笔记
下一篇: 澳门娱乐场所排行 澳门有哪些娱乐