第十章 Netty
程序员文章站
2024-01-31 12:15:46
...
10.1 IO模型 10.2 Netty原理源码 10.3 Netty应用
10.1 IO模型
Reactor设计模式
所有IO操作都由NIO单线程完成,然后通过这个线程分配handler处理请求。
这样不会每有新的请求就会创建新的线程。有的线程存活但空闲,不给这些任务分配线程也节约了IO线程空间
10.2 Netty原理源码
一 原理
Netty主要基于主从Reactor多线程模型
Netty执行流程图
主从线程池具体介绍
1 Netty 抽象2组线程池
2 BossGroup 负责接收客户端连接
WorkerGroup 负责网络读写
3 两者都是NioEventloopGroup类型
4 NioEventloopGroup可有多线程,相当于一个事件循环组,组中多个事件不断循环,每个事件循环都是一个NioEventloop
5 NioEventloop:不断循环执行处理任务的线程,每个NioEventloop有一个select用于监听绑定在其上的socket网络通讯
6 每个Boss NioEventloopGroup循环执行步骤
1 轮询accept事件
2 处理accept与client连接,生成NioSocketChannel并将其注册到某个worker NioEventloop上的selector
3 处理任务队列任务,即runAllTasks
7 每个Worker NioEventloopGroup循环执行步骤
1 轮询读写事件
2 处理I/O事件(即读写事件),在对应的NioSocketChannel处理
3 runAllTasks
8 每个worker NioEventloop处理业务时会使用pipeline管道,理论上channel包含channelpipeline,每个pipeline包含多个ChannelHandlerContext,每个ChannelHandler封装一个ChannelHandler。每个pipeline也会包含即调出所在channel
bossGroup和WorkerGroup含有的子线程数默认cpu核数×2
子线程轮询执行
channel和pipeline互相包含
ctx包含了channel和pipeline
Reactor中任务队列(TaskQueue) runAllTasks
当有一个非常消耗时间的业务
会异步执行(先返回一个结果但IO还没完成),别的IO处理完再从taskQueue中拿task最后处理)
提交该channel先到对应NioEventloop的taskQueue中
异步模型
ChannelFuture是一个接口 extend Future
可添加监听器,当监听事件发生时触发
异步高并发更稳定更高吞吐量
ChannelFuture cf = bootstrap.bind(6668).syn();
cf.addListener(new ChannelFutureListener(){
@Override
~ operationComplete(ChannelFuture future){
if(cf.isSuccess()){
System.out.println("监听成功");
}else{
System.out.println("监听失败");
}
}
})
Netty核心组件
Bootstrap,ServerBootstrap
配置整个Netty程序,串联各个组件
bootstrap.handler(xxx) //对应BossGroup
bootstrap.childHandler(xxx) //对应workerGroup
Future, ChannelFuture
返回异步对象
Channel
执行网络I/O操作
可通过Channel获得当前网络连接通道状态,网络连接配置参数,提供异步操作
不同协议,不同阻塞提供不同类型的Channel
Selector
1 Netty基于selector实现I/O多路复用,一个selector线程可监听多个连接的channel事件
2 当向selector中注册channel,selector可不断自动查询这些注册的channel是否有就绪状态
ChannelHandler及其实现类
ChannelHandler一个接口, 处理或拦截I/O事件,并将其转发到其ChannelPipeline中下一个
ChannelPipeline
提供了ChannelHandler链的容器,如client为例,事件从client->server,称该事件为出站。即client发送给server的数据会通过pipeline中一系列ChannelOutBoundHandler处理,反之为入站
是一个双向链表,入站事件从head往tail传,出站tail往head传,两种类型handler互不干扰
channelPipeline实现了一种高级形式拦截过滤器模式,使用户可控制事件处理方式,及Channel中各个handler如何互相交互
ChannelHandlerContext
保存channel相关所有信息,同时关联一个channelHandler对象,即context中包含了具体事件处理器channelHandler也有对应的pipeline和channel信息,方便对channelHandler调用
ChannelOption
创建channel实例后设置channelOption参数,如初始化服务器可用队列大小,心跳保持连接等
EventLoopGroup
1 一组Eventloop为更好利用多核CPU资源,一般会有多个EventLoop同时工作,一个EventLoop维护一个selector
2 提供next接口,按规则拿其中Eventloop处理任务,如BossEventloop和WorkerEventloop
3 通常一个服务器端口即一个ServerSocketChannel对应一个selector和一个eventloop线程。BossEventLoop接收client连接并将socketChannel交给workerEventLoop处理IO
Unpooled类
用来操作缓冲区工具类(Netty数据容器),不用flip反转,底层维护了readerIndex,writerIndex
Netty零拷贝
heapByteBuffer是JVM heap上分配的Byte缓冲区,简单看成对这个byte缓冲区的直接byte[]数组封装,不需要拷贝直接引用封装写到DirectByteBuffer(机器内存上实现零拷贝)
二 基本源码
看完应用,再回来看源码
Netty对象池源码
核心组件源码
1 启动类 main方法中首先创建SSL配置类
2 创建两个EventLoopGroup对线
1 bossGroup 接受TCP请求,然后交给workerGroup通信
2 EventLoopGroup 包含多个Eventlooop,可注册channel,事件循环中选择
3 主 new NioEventloop(1) 1表示有1个线程可指定
从 new NioEventloop() 默认 cpu*2数量的线程
会创建EventExcutor数组 children = new EventExecutor[nThread];
每个元素类型为NioEventloop,NioEventloop实现了EventLoop和Executor接口
EventLoop接口
1 继承了ScheduledExecutorService接口,可接受定时任务
2 继承了SingleThreadEventExecutor接口,所以是单线程线程池
3 该接口一旦被channel注册,就处理该channel对应所有IO操作
4 它是单线程线程池,一个死循环线程反复不断做3件事情
监听端口:默认selector调用CAS阻塞1秒监听,
处理端口:有任务则唤醒selector调用processSelectKeys处理key
处理队列事件:再之后runAllTasks
5 一个Eventloop可以绑定多个channel,每个channel只能由一个Eventloop处理
children数组下
1 实例化单例线程池数组(CPU*2数量)
2 根据线程选择工厂创建一个线程选择器selector
3 每个单例线程池加一个关闭监听器
4 将所有单例线程池加入LinkedHashSet(children)
3 try块创建ServerBootstrap对象,引导类,用于启动服务器和引导整个程序初始化。即调用group方法将两个主从group加入自己字段
4 添加一个channel(服务器自身channel),参数是clas对象,引导类通过这个class反射创建ChannelFactory。再添加一些TCP参数(改channel在bind方法真正创建)
5 加服务器日志handler
6 加SocketChannel的handler配置主从handler
7 绑定端口并阻塞直到连接成功
bind底层两个方法
initAndRegister 简而言之就是初始化所有channel,handler,pipeline,context
dobind 则是select轮询事件,处理IO,处理任务队列
8 mian线程阻塞等待关闭
9 finally块中代码在服务器关闭时关闭所有资源
10.3 Netty应用
1 Netty服务端客户端简单通讯
服务端
//创建主从线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务器启动对象,配置参数
try{
ServerBootstrap bootstrap = new ~;
//NioServerSocketChannel.class服务器通道反射实现
//ChannelOption.SO_BACKLOG,128线程队列连接个数
//ChannelOption.SO_KEEPALIVE,true从线程池都保持心跳机制
//new ChannelInitializer<SocketChannel>()给从线程池的eventloop对应管道设置处理器handler
bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(new ChannelInitializer<SocketChannel>(){
@Override
~ initChannel(SocketChannel ch){
ch.pipeline.addLast(new NettyServerHandler());//添加自己的实现类
}
})
//服务器ready了
//绑定一个端口同步,生成一个ChannelFuture异步对线,启动server
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭的通道监听,监听到关闭事件才关闭
cf.channel().closeFuture().sync();
}catch{
xxx
}finally{
xxx
}
//自定义一个handler需要继承netty规定好的适配器
public class NettyServerHandler extends ChannelInboundHanderAdapter{
//client data实现
//当通道有读取事件会触发
//ctx上下文对象包含pipeline和channel,msg是client发送的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
//讲msg数据转成ByteBuf(Netty提供的buffer)
ByteBuffer buf = (ByteBuf) msg;
System.out.println(buf.toString(CharsetUtil.UTF_8))//显示发送的数据
System...("客服端地址: "+ctx.channel().remoteAddress());
}
//数据读取完触发
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
//讲数据写入缓存并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("HelloClient",~UTF_8));
}
//异常处理,一般是关闭通道
~ exceptionCaught(ChannelHandlerContext ctx,~){
ctx.channel().close(); or ctx.close();
}
}
客户端
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务器启动对象,配置参数
try{
Bootstrap bootstrap = new ~;
bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new ChannelInitializer<SocketChannel>(){
@Override
~ initChannel(SocketChannel ch){
ch.pipeline.addLast(new NettyClientHandler());
}
})
//启动client连接server
ChannelFuture cf = bootstrap.connect("127.0.0.1",6668).sync();
//对关闭的通道监听,监听到关闭事件才关闭
cf.channel().closeFuture().sync();
}catch{
xxx
}finally{
xxx
}
public class NettyClientHandler extends ChannelInboundHanderAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.copiedBuffer("HelloServer",~UTF-8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
//读取服务器消息
ByteBuffer buf = (ByteBuf) msg;
System.out.println(buf.toString(CharsetUtil.UTF_8))//显示发送的数据
System...("服务端地址: "+ctx.channel().remoteAddress());
}
~ exceptionCaught(ChannelHandlerContext ctx,~){
ctx.channel().close(); or ctx.close();
}
}
2 编解码器
Netty的StringEncoder(String) 和 ObjectEncoder(Java对象) 都使用java序列化技术
体积太大,二进制编码5倍多。无法跨语言。序列化性能低
解决方案
TCP+谷歌的 protobuf 效率更高
3 TCP粘包拆包
解决方案:自定义协议+编解码器
Client端
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyClientHandler());
Server端
pipeline.addLast(new MyMessageDncoder());
pipeline.addLast(new MyServerHandler());
客户端
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol>{
public void channelActive(ChannelHandlerContext ctx){
//自定义协议
MessageProtocol protocol = new ~() //自己定义的类,规定交互数据的长度内容
protocol.setLen(xxx);
protocol.setContent(xxx);
ctx.writeAndFlush(protocol);
}
}
//客户端编码器
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol>{
@Override
~ encode(ChannelHandlerContext ctx,MessageProtocol msg,ByteBuf out){
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
服务端
public class MyMessageDecoder extends ReplayingDecoder<void>{
@Override
~ decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out){
//讲二进制字节码转成我们需要的MessageProtocol
int len = in.readInt();
byte[] content = new ~;
in.readBytes(content);
MessageProtocol protocol = new ~;
protocol.setLen(len);
protocol.setContent(content);
out.add(protocol);
}
}
public class MyServerHandler ~{
@Override
~ channelRead0(ChannelHandlerContext ctx, MessageProtocol msg){
//拿到数据进行业务处理
}
}
Netty耗时操作的解决
普通线程池仍同一个线程,会阻塞,所以需要异步
handler中新建一个线程池变量来处理