手把手教你用Netty实现一个RPC框架
程序员文章站
2022-03-20 23:00:34
写在开头本文使用Netty简单实现一个RPC框架,包括服务端,客户端,注册中心等,暂时不考虑监控,并且因为使用的Netty,所以使用到了Netty的封装API,所以不熟悉NettyAPI的小伙伴可以先熟悉一下API,我在代码中也注释了相关步骤和逻辑,因为Neety其实就是对网络通信的封装框架,所以底层还是IO那一套。建议大家熟悉一下NIO的三件套,buffer,selector,channel。项目结构api 服务接口consumer 消费者protocol 策略对象provi....
写在开头
本文使用Netty简单实现一个RPC框架,包括服务端,客户端,注册中心等,暂时不考虑监控,并且因为使用的Netty,所以使用到了Netty的封装API,所以不熟悉NettyAPI的小伙伴可以先熟悉一下API,我在代码中也注释了相关步骤和逻辑,因为Netty其实就是对网络通信的封装框架,所以底层还是IO那一套。建议大家熟悉一下NIO的三件套,buffer,selector,channel。
项目结构
- api 服务接口
- consumer 消费者
- protocol 策略对象
- provider 生产者-api接口服务实现
- registry 注册中心
api
/**
* @author : Ls
* @ClassName : IRpcService
* @date : 2021-01-04 15:46
* @Version : 1.0
* @Description : RPC基础服务接口
**/
public interface IRpcDoSomethingService {
int add(int a,int b);
int sub(int a,int b);
int mul(int a,int b);
int div(int a,int b);
}
/**
* @author : Ls
* @ClassName : IRpcHelloService
* @date : 2021-01-04 15:45
* @Version : 1.0
* @Description : Rpc基础服务接口
**/
public interface IRpcSayHelloService {
String sayHello(String name);
}
provider
/**
* @author : Ls
* @ClassName : IRpcDoSomethingServiceImpl
* @date : 2021-01-04 15:50
* @Version : 1.0
* @Description :
**/
public class IRpcDoSomethingServiceImpl implements IRpcDoSomethingService {
public int add(int a, int b) {
return a + b;
}
public int sub(int a, int b) {
return a - b;
}
public int mul(int a, int b) {
return a * b;
}
public int div(int a, int b) {
return a / b;
}
}
/**
* @author : Ls
* @ClassName : IRpcHelloServiceImpl
* @date : 2021-01-04 15:47
* @Version : 1.0
* @Description :
**/
public class IRpcSayHelloServiceImpl implements IRpcSayHelloService {
public String sayHello(String name) {
return "Hello "+name+" !!" ;
}
}
procotol
/**
* 自定义传输协议
*/
@Data
public class MyInvokerProtocol implements Serializable {
private String className;//类名
private String methodName;//函数名称
private Class<?>[] parames;//形参列表
private Object[] values;//实参列表
}
registry
/**
* @author : Ls
* @ClassName : IRpcRegistry
* @date : 2021-01-04 15:52
* @Version : 1.0
* @Description : 注册中心
**/
public class IRpcRegistry {
// 暴露端口
private int port;
public IRpcRegistry(int port){
this.port = port;
}
/**
* 开启服务接口监听
* 调用 netty 相关API实现接口监听
*/
public void listen(){
// Boss线程 (Selector核心)
NioEventLoopGroup boss = new NioEventLoopGroup();
// Work线程 (工作线程)
NioEventLoopGroup work = new NioEventLoopGroup();
// 1. 建立服务
ServerBootstrap server = new ServerBootstrap();
// 2. 注入 Boos/Worker
server.group(boss,work)
.channel(NioServerSocketChannel.class) // 3. 管道执行 keys 轮询的核心
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
// 5. 对流数据进行解析
ChannelPipeline pipeline = channel.pipeline();
// 6. 自定义协议解码器 (取决于自己定义的规则对象)
/** 入参有5个,分别解释如下
* maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
* lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
* lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
* lengthAdjustment:要添加到长度字段值的补偿值
* initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LengthFieldPrepender(4));
// 7. 参数解析
//对象参数类型编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 8. 执行业务逻辑
pipeline.addLast(new IRegistryHandler());
}
}) // 4. 子线程 执行对应的业务逻辑
.option(ChannelOption.SO_BACKLOG,128) // 主线程最大连接数
.childOption(ChannelOption.SO_KEEPALIVE,true); // 子线程持续
try {
// 服务绑定端口
ChannelFuture future = server.bind(port).sync();
System.out.println("RPC start success, listen port is :" + port + " !!");
future.channel().closeFuture().sync(); // 回调
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
public static void main(String[] args) {
// 监听启动
new IRpcRegistry(8080).listen();
}
}
/**
* @author : Ls
* @ClassName : IRegistryHandler
* @date : 2021-01-04 16:19
* @Version : 1.0
* @Description : 业务执行
**/
public class IRegistryHandler extends ChannelInboundHandlerAdapter {
// 注册中心 (容器)
public static ConcurrentHashMap<String,Object> context = new ConcurrentHashMap<String,Object>();
// 类信息集合
private List<String> classNames = new ArrayList<String>();
// 构造器初始化
public IRegistryHandler(){
// 包扫描信息
scannerClass("com.lishuo.netty.rpc.provider");
// 注册容器
doRegistry();
}
// 递归包扫描
private void scannerClass(String packageName) {
// 获取类加载器
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
for (File file : dir.listFiles()) {
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName());
}else{
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}
// 注册
private void doRegistry() {
if(classNames.isEmpty()){return;}
for (String className : classNames) {
try {
Class<?> clazz = Class.forName(className);
Class<?> anInterface = clazz.getInterfaces()[0]; // 因为此demo默认实现一个接口 所以此处写死获取当前第一个接口信息
context.put(anInterface.getName(),clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 请求到达 执行执行的业务逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
MyInvokerProtocol request = (MyInvokerProtocol) msg; // netty 会按照我们自定义的策略进行转换
// 判断当前调用服务在容器中是否真正存在
if(context.containsKey(request.getClassName())){
// 确实存在执行对应的业务逻辑
Object clazz = context.get(request.getClassName());
// 获取真正执行的
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
result = method.invoke(clazz, request.getValues());
}
if(result != null){
ctx.write(result);
}
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
consumer
/**
* @author : Ls
* @ClassName : IRpcConsumer
* @date : 2021-01-04 16:58
* @Version : 1.0
* @Description :
**/
public class IRpcConsumer {
public static void main(String[] args) {
IRpcSayHelloService invoke = IRpcProxy.invoke(IRpcSayHelloService.class);
System.out.println(invoke.sayHello("netty"));
IRpcDoSomethingService invoke1 = IRpcProxy.invoke(IRpcDoSomethingService.class);
System.out.println(invoke1.add(2,4));
System.out.println(invoke1.mul(2,4));
System.out.println(invoke1.sub(2,4));
System.out.println(invoke1.div(2,4));
}
}
/**
* @author : Ls
* @ClassName : IRpcProxy
* @date : 2021-01-04 16:59
* @Version : 1.0
* @Description :
**/
public class IRpcProxy {
public static <T> T invoke(Class<T> clazz){
Class<?> [] interfaces = clazz.isInterface() ?
new Class[]{clazz} :
clazz.getInterfaces();
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,new ConsumerProxyHandler(clazz));
return result;
}
}
/**
* @author : Ls
* @ClassName : ConsumerProxyHandler
* @date : 2021-01-04 17:09
* @Version : 1.0
* @Description :
**/
public class ConsumerProxyHandler implements InvocationHandler {
private Class<?> clazz;
public ConsumerProxyHandler(Class<?> clazz){
this.clazz = clazz;
}
// 动态代理执行相应业务逻辑
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(Object.class.equals(method.getDeclaringClass())){
// 如果当前就是一个实现类
return method.invoke(proxy,args);
}else{
return rpcInvoke(method,args);
}
}
private Object rpcInvoke(Method method, Object[] args) {
MyInvokerProtocol request = new MyInvokerProtocol();
request.setClassName(this.clazz.getName()); // 类名称
request.setMethodName(method.getName()); // 方法名称
request.setParames(method.getParameterTypes()); // 入参列表
request.setValues(args); // 实参列表
// TCP 远程调用
final IRpcProxyHandler consumerHandler = new IRpcProxyHandler();
NioEventLoopGroup work = new NioEventLoopGroup();
Bootstrap server = new Bootstrap();
server.group(work)
.channel(NioSocketChannel.class)// 客户端管道
.option(ChannelOption.TCP_NODELAY, true) // 开启
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
});
ChannelFuture future = null;
try {
future = server.connect("localhost", 8080).sync();
future.channel().writeAndFlush(request).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
work.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
/**
* @author : Ls
* @ClassName : IRpcProxyHandler
* @date : 2021-01-04 17:18
* @Version : 1.0
* @Description :
**/
public class IRpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
本文地址:https://blog.csdn.net/ls490447406/article/details/112230244