Reactor学习
Reactor
我们需要先了解下NIO的基础和概念。
NIO相对于BIO是非阻塞的API。NIO有三个核心组件分别是:
Buffer缓冲区
channel通道
selector选择器
Buffer缓冲区
缓冲区的本质是一个可以写入数据的内存模块,可以再次读取。
使用Buffer进行数据写入和读取,有四个步骤:
1、将数据写入缓冲区,
2、调用buffer.filp(),转为读取模式
3、缓冲区读取数据
4、调用buffer.clear()或者buffer.compact()清除缓存区
channel通道
bio的管道是outStream和inputStream直接对对接。Nio是channel和channel对接。如下图:
主要是socketchannel和serversocketcannel.
socketchannel用户tcp的网络连接。对应客户端而言是客户端主动发起和客户端的链接。对应服务端是服务的获取新的链接。
SocketChannel socketChannel =SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(“127.0.0.1”,9030));
//创建网络请求
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//设置非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(9030));//绑定端口
System.out.println(“启动成功”);
selector选择器
实现了单个线程可以管理多个通道,从而管理多个网络连接。
一个线程selector监听多个channel的不同事件:
1、connect连接
2、accept准备就绪
3、read读取
4、write写入
Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。
通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“ 连接就绪 ”。一个Server Socket Channel准备好接收新进入的连接称为“ 接收就绪 ”。一个有数据可读的通道可以说是“ 读就绪 ”。等待写数据的通道可以说是“ 写就绪 ”。
单线程模型Reactor
所有的IO操作都在同一个NIO线程上完成的。单线程NIO处理监听请求、接受读请求、处理业务逻辑、写请求等。
作为服务端的Reactor:
1、接受客户端的tcp请求
2、读取通信端的请求和应答信息
3、向通信端发送请求。
单reacotor的模型如下:
Reactor 多线程模型
Reactor多线程模型与单线程模型最大的区别在于,IO处理线程不再是一个线程,而是一组NIO处理线程。原理如下图所示
其特点:
1)有一个acceptor线程监听客户端 的tcp请求
2)读写IO线程,由专门的线程池负责读写。这些io读写的线程池负责读取、发送数据
3)一个acceptor线程可以处理N个链路,但是一个链路只有一个accptor线程
主从Reactor 多线程模型
主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO的线程池。Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证),再将新创建的SocketChannel注册到IO线程池(sub reactor)的某个IO处理线程上并处理编解码和读写工作。Acceptor线程池仅负责客户端的连接与认证,一旦链路连接成功,就将链路注册到后端的sub Reactor的IO线程池中。 线程模型图如下
package com.wm.test.net.mynio;
import com.study.hc.net.nio.NIOServerV3;
import com.sun.org.apache.bcel.internal.generic.Select;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多路复用选择器 是selector
*
* reactor线程模型,一般是2种
* 一种是单reactor模式
* reactor负责处理accept,和读、写就绪
* handle负责处理读数据和写数据, workspace工作线程负责处理业务
*
* 二种是多线程reator模式
* mainreactor是负责处理accept,读写就绪
* subreactor线程负责处理读、写数据
* workspace工作线程负责处理业务
*
*
*/
public class NIOServierV3{
/** 处理业务操作的线程 */
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**只有1个serverchannel 和多个客户端client 连接**/
private ServerSocketChannel serverSocketChannel;
/** 1、创建多个线程 - accept处理reactor线程 (accept线程)**/
private ReactorThread[] mainReactorThread = new ReactorThread[1];
private ReactorThread[] subReactorThread= new ReactorThread[8];
/** ReactorThread 线程,封装着,selector,accept和reader 以及 serverSocketChannel 和 SocketChannel
* 注册监听事件中**/
abstract class ReactorThread extends Thread{
Selector selector;//多路复用选择器
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();//单独注册channle用的
//seletor 监听到事件后,调用这个方法
public abstract void handle(SelectableChannel selectableChannel)throws Exception;
//初始化选择器
public ReactorThread() throws IOException {
this.selector=Selector.open();
}
volatile boolean running =false;
private void doStart(){
if(!running){
running=true;
super.start();//启动线程
}
}
private SelectionKey regist(SelectableChannel selectableChannel) throws Exception {
// selectableChannel.register(selector,0,selectableChannel);//频道注册到选择器上,模式是0
// 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
// 代码里select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,就放到同一个线程来处理
//必须等任务都注册完成后,才能调用select()
//上面的方法改为由线程任务提交模式
FutureTask<SelectionKey> task = new FutureTask<SelectionKey>(()-> selectableChannel.register(selector, 0, selectableChannel));
taskQueue.add(task);
return task.get();
}
@Override
public void run() {
while(running){
Runnable runnable;
while((runnable=taskQueue.poll())!=null){
runnable.run();
}
try {
selector.select(1000);
//取出所有的事件集合
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> iterator= selected.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
//key对应的时间不同的注册事件不一样
iterator.remove();
int readyOps = key.readyOps();
//public final boolean isReadable()
//测试此键的通道是否已准备好进行读取。
//调用此方法的形式为 k.isReadable() ,该调用与以下调用的作用完全相同:
/* if((readyOps & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ))!=0 || readyOps==0){
}*/
//上面的用户相同
if(key.isReadable() || key.isAcceptable() || readyOps==0){
SelectableChannel selectableChannel= (SelectableChannel) key.attachment();
selectableChannel.configureBlocking(false);
try {
handle(selectableChannel);
//如果是serverchannel则是需要accept获取socketchannel和 注册读事件
//还有将serverchannel分配给subReactor去处理
//如果是socketchannel则需要直接读取数据,做业务逻辑处理
if(!selectableChannel.isOpen()){
key.cancel(); // 如果关闭了,就取消这个KEY的订阅
}
} catch (Exception e) {
e.printStackTrace();
key.cancel(); // 如果有异常,就取消这个KEY的订阅
}
}
}
//while的外面 与 selector.select并列
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 绑定端口
*
* @throws IOException IO异常
*/
private void bind() throws IOException {
// 1、 正式绑定端口,对外服务
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("启动完成,端口8080");
}
/**
* 初始化线程组
*/
private void newMain和subReactorGroup() throws IOException {
// 创建mainReactor线程, 只负责处理serverSocketChannel
for (int i = 0; i < mainReactorThread.length; i++) {
AtomicInteger incr = new AtomicInteger(0);
mainReactorThread[i] = new ReactorThread() {
@Override
public void handle(SelectableChannel selectableChannel) throws Exception {
// 只做请求分发,不做具体的数据读取
ServerSocketChannel ch = (ServerSocketChannel) selectableChannel;
SocketChannel socketChannel= ch.accept();
socketChannel.configureBlocking(false);
// 收到连接建立的通知之后,分发给I/O线程继续去读取数据
int index = incr.getAndIncrement() % subReactorThread.length;
//ReactorThread workEventLoop = subReactorThread[index];
ReactorThread ioReactor=subReactorThread[index];
ioReactor.doStart();//io线程跑起来 和 io线程注册 是分开的,其实里面一个select()和rigitst里面都是用到锁的
//在regist用的是线程注册,futuretask是阻塞的,等dostart线程里面的taskqune任务run之后可以获取
SelectionKey selectionKey = ioReactor.regist(socketChannel);//是注册频道和channel、selector关联
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());
}
};
}
// 创建IO线程,负责处理客户端连接以后socketChannel的IO读写
for (int i = 0; i < subReactorThread.length; i++) {
subReactorThread[i]=new ReactorThread(){
@Override
public void handle(SelectableChannel selectableChannel) throws Exception {
// work线程只负责处理IO处理,不处理accept事件
SocketChannel ch = (SocketChannel) selectableChannel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());
// TODO 业务操作 数据库、接口...
workPool.submit(() -> {
});
// 响应结果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
}
/**
* 初始化channel,并且启动一个eventLoop线程,
* 服务端需要启动serversockert
*
* @throws IOException IO异常
*/
private void initAndRegister() throws Exception {
// 1、 创建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、 将serverSocketChannel注册到selector
int index = new Random().nextInt(mainReactorThread.length);
mainReactorThread[index].doStart();
SelectionKey selectionKey = mainReactorThread[index].regist(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
}