Netty的模型演进及快速入门
BIO模型
Demo
public class BIOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(6666);
ExecutorService executorService = Executors.newCachedThreadPool();
while (true) {
System.out.println("等待客户端连接。。。。 ");
Socket socket = serverSocket.accept(); //阻塞
executorService.execute(() -> {
try {
InputStream inputStream = socket.getInputStream(); //阻塞
byte[] bytes = new byte[1024];
while (true){
int length = inputStream.read(bytes);
if(length == -1){
break;
}
System.out.println(new String(bytes, 0, length, "UTF-8"));
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
这种模式存在的问题:
- 客户端的并发数与后端的线程数成1:1的⽐例,线程的创建、销毁是⾮常消耗系统资源的,随着并发量增⼤,服务端性能将显著下降,甚⾄会发⽣线程堆栈溢出等错误。
- 当连接创建后,如果该线程没有操作时,会进⾏阻塞操作,这样极⼤的浪费了服务器资源。
NIO模型
NIO,称之为New IO 或是 non-block IO (⾮阻塞IO),这两种说法都可以,其实称之为⾮阻塞IO更恰当⼀些。
NIO相关的代码都放在了java.nio包下,其三⼤核⼼组件: Buffer(缓冲区) 、 Channel(通道) 、Selector(选择器/多路复⽤器)
- Buffer
- 在NIO中,所有的读写操作都是基于缓冲区完成的,底层是通过数组实现的,常⽤的缓冲区是
ByteBuffer,每⼀种java基本类型都有对应的缓冲区对象(除了Boolean类型),如:
CharBuffer、 IntBuffer、 LongBuffer等。
- 在NIO中,所有的读写操作都是基于缓冲区完成的,底层是通过数组实现的,常⽤的缓冲区是
- Channel
- 在BIO中是基于Stream实现,⽽在NIO中是基于通道实现,与流不同的是,通道是双向的,
既可以读也可以写。
- 在BIO中是基于Stream实现,⽽在NIO中是基于通道实现,与流不同的是,通道是双向的,
- Selector
- Selector是多路复⽤器,它会不断的轮询注册在其上的Channel,如果某个Channel上发⽣读
或写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey获
取就绪Channel的集合,进⾏IO的读写操作。
可以看出, NIO模型要优于BIO模型,主要是:
- Selector是多路复⽤器,它会不断的轮询注册在其上的Channel,如果某个Channel上发⽣读
- 通过多路复⽤器就可以实现⼀个线程处理多个通道,避免了多线程之间的上下⽂切换导致系统开销过⼤。
- NIO⽆需为每⼀个连接开⼀个线程处理,并且只有通道真正有有事件时,才进⾏读写操作,这样⼤⼤的减少了系统开销。
Demo
public class SelectorDemo {
/**
* 注册事件
*
* @return
*/
private Selector getSelector() throws Exception {
//获取selector对象
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); //⾮阻塞
//获取通道并且绑定端⼝
ServerSocket socket = serverSocketChannel.socket();
socket.bind(new InetSocketAddress(6677));
//注册感兴趣的事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
return selector;
}
public void listen() throws Exception {
Selector selector = this.getSelector();
while (true) {
selector.select(); //该⽅法会阻塞,直到⾄少有⼀个事件的发⽣
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
process(selectionKey, selector);
iterator.remove();
}
}
}
private void process(SelectionKey key, Selector selector) throws Exception{
if(key.isAcceptable()){ //新连接请求
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false); //⾮阻塞
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){ //读数据
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println("form 客户端 " + new String(byteBuffer.array(),0, byteBuffer.position()));
}
}
public static void main(String[] args) throws Exception {
new SelectorDemo().listen();
}
}
AIO模型
AIO是asynchronous I/O的简称,是异步IO,该异步IO是需要依赖于操作系统底层的异步IO实现。
AIO的基本流程是:⽤户线程通过系统调⽤,告知kernel内核启动某个IO操作,⽤户线程返回。 kernel内核在整个IO操作(包括数据准备、数据复制)完成后,通知⽤户程序,⽤户执⾏后续的业务操作。
⽬前AIO模型存在的不⾜:
- 需要完成事件的注册与传递,这⾥边需要底层操作系统提供⼤量的⽀持,去做⼤量的⼯作。
- Windows 系统下通过 IOCP 实现了真正的异步 I/O。但是,就⽬前的业界形式来说, Windows 系统,很少作为百万级以上或者说⾼并发应⽤的服务器操作系统来使⽤。
- ⽽在 Linux 系统下,异步IO模型在2.6版本才引⼊,⽬前并不完善。所以,这也是在 Linux 下,实现⾼并发⽹络编程时都是以 NIO 多路复⽤模型模式为主。
Reactor线程模型
Reactor线程模型不是Java专属,也不是Netty专属,它其实是⼀种并发编程模型,是⼀种思想,具有指导意义。⽐如, Netty就是结合了NIO的特点,应⽤了Reactor线程模型所实现的。
Reactor模型中定义的三种⻆⾊:
- Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建⽴就绪、读就绪、写就绪等。
- Acceptor:处理客户端新连接,并分派请求到处理器链中。
- Handler:将⾃身与事件绑定,执⾏⾮阻塞读/写任务,完成channel的读⼊,完成处理业务逻辑后,负责将结果写出channel。
Reactor单线程模型
说明:
- Reactor充当多路复⽤器⻆⾊,监听多路连接的请求,由单线程完成
- Reactor收到客户端发来的请求时,如果是新建连接通过Acceptor完成,其他的请求由Handler完成。
- Handler完成业务逻辑的处理,基本的流程是: Read --> 业务处理 --> Send 。
这种模型的优缺点:
- 优点
- 结构简单,由单线程完成,没有多线程、进程通信等问题。
- 适合⽤在⼀些业务逻辑⽐较简单、对于性能要求不⾼的应⽤场景。
- 缺点
- 由于是单线程操作,不能充分发挥多核CPU的性能。
- 当Reactor线程负载过重之后,处理速度将变慢,这会导致⼤量客户端连接超时,超时之后往往会进⾏重发,这更加重Reactor线程的负载,最终会导致⼤量消息积压和处理超时,成为系统的性能瓶颈。
- 可靠性差,如果该线程进⼊死循环或意外终⽌,就会导致整个通信系统不可⽤,容易造成单
点故障。
单Reactor多线程模型
说明:
- 在Reactor多线程模型相⽐较单线程模型⽽⾔,不同点在于, Handler不会处理业务逻辑,只是负责响应⽤户请求,真正的业务逻辑,在另外的线程中完成。
- 这样可以降低Reactor的性能开销,充分利⽤CPU资源,从⽽更专注的做事件分发⼯作了,提升整个应⽤的吞吐。
但是这个模型存在的问题:
- 多线程数据共享和访问⽐较复杂。如果⼦线程完成业务处理后,把结果传递给主线程Reactor进⾏发送,就会涉及共享数据的互斥和保护机制。
- Reactor承担所有事件的监听和响应,只在主线程中运⾏,可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握⼿进⾏安全认证,但是认证本身⾮常损耗性能。
为了解决性能问题,产⽣了第三种主从Reactor多线程模型。
主从Reactor多线程模型
在主从模型中,将Reactor分成2部分:
- MainReactor负责监听server socket,⽤来处理⽹络IO连接建⽴操作,将建⽴的socketChannel指定注册给SubReactor。
- SubReactor主要完成和建⽴起来的socket的数据交互和事件业务处理操作。
该模型的优点:
- 响应快,不必为单个同步事件所阻塞,虽然Reactor本身依然是同步的。
- 可扩展性强,可以⽅便地通过增加SubReactor实例个数来充分利⽤CPU资源。
- 可复⽤性⾼, Reactor模型本身与具体事件处理逻辑⽆关,具有很⾼的复⽤性。
Netty模型
Netty模型是基于Reactor模型实现的,对于以上三种模型都有⾮常好的⽀持,也⾮常的灵活,⼀般情况,在服务端会采⽤主从架构模型
说明:
- 在Netty模型中,负责处理新连接事件的是BossGroup,负责处理其他事件的是WorkGroup。Group就是线程池的概念。
- NioEventLoop表示⼀个不断循环的执⾏处理任务的线程,⽤于监听绑定在其上的读/写事件。
- 通过Pipeline(管道)执⾏业务逻辑的处理, Pipeline中会有多个ChannelHandler,真正的业务逻辑是在ChannelHandler中完成的。
Demo
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.wangj.myrpc</groupId>
<artifactId>MyRPC</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
服务端MyRPCServer
package cn.wangj.myrpc.server;
import cn.wangj.myrpc.server.handler.MyChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyRPCServer {
public void start(int port) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是: cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker) //设置线程组
.channel(NioServerSocketChannel.class) //配置server通道
.childHandler(new MyChannelInitializer()); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("服务器启动完成,端⼝为: " + port);
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
MyChannelInitializer
package cn.wangj.myrpc.server.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
/**
* ChannelHandler的初始化
*/
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//将业务处理器加⼊到列表中
ch.pipeline().addLast(new MyChannelHandler());
}
}
MyChannelHandler
package cn.wangj.myrpc.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class MyChannelHandler extends ChannelInboundHandlerAdapter {
/**
* 获取客户端发来的数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
ByteBuf byteBuf = (ByteBuf) msg;
String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("客户端发来数据: " + msgStr);
//向客户端发送数据
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
/**
* 异常处理
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
测试⽤例
package cn.wangj.myrpc;
import cn.wangj.myrpc.server.MyRPCServer;
import org.junit.Test;
public class TestServer {
@Test
public void testServer() throws Exception{
MyRPCServer myRPCServer = new MyRPCServer();
myRPCServer.start(5566);
}
}
客户端
package cn.wangj.myrpc.client;
import cn.wangj.myrpc.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyRPCClient {
public void start(String host, int port) throws Exception {
//定义⼯作线程组
EventLoopGroup worker = new NioEventLoopGroup();
try {
//注意: client使⽤的是Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class) //注意: client使⽤的是NioSocketChannel
.handler(new MyClientHandler());
//连接到远程服务
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
MyClientHandler
package cn.wangj.myrpc.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息: " + msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务端发送数据
String msg = "hello";
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
测试用例
package cn.wangj.myrpc;
import cn.wangj.myrpc.client.MyRPCClient;
import org.junit.Test;
public class TestClient {
@Test
public void testClient() throws Exception{
new MyRPCClient().start("127.0.0.1", 5566);
}
}
核心组件
EventLoop、 EventLoopGroup
ChannelHandler
ChannelPipeline
Bootstrap
⼩结
可结合https://www.processon.com/view/link/5f9827e45653bb30178b9d90中的Netty调优学习理解
本文地址:https://blog.csdn.net/wangmourena/article/details/112242624
下一篇: Spring注解学习总结---IOC部分