【Java IO】从NIO到Reactor三种模式
文章目录
- 一、前言
- 二、bytebuffer的基本使用(非重要,仅仅bytebuffer的语法,面试无用)
- 2.1 Demo1_Random_Print
- 2.2 Demo2_Read_Print
- 2.3 Demo3_bytes_Write
- 2.4 Demo4_Read_Write
- 2.5 Demo5
- 2.6 Demo6
- 2.7 Demo7
- 2.8 Demo8_Read_Write_DirectMemory(与Demo4对应,一个堆上,一个堆外)
- 2.9 Demo9
- 三、NIO(重要)
- 四、基于NIO的Reactor单线程模式(重要)
- 五、基于NIO的Reactor多线程模式(重要)
- 六、Reactor主从多线程模式(重要)
- 七、面试金手指(重要,面试语言组织,文字为主,当被问到IO的时候)
- 八、小结
一、前言
二、bytebuffer的基本使用(非重要,仅仅bytebuffer的语法,面试无用)
2.1 Demo1_Random_Print
2.1.1 bytebuffer三种变量:position limit capacity
Buffer中有几个关键的变量(capacity,limit,position),我们需要理解它,下面我会用画图的方式帮助理解:
上图我们可以看到,byteBuffer在创建的时候,会有三个下标:position指向是第一个位置,capacity、limit指向的是最后一个位置加1。当执行put()的时候position会移动,position会指向第一个空的缓冲区,并且它的值会永远小于limit。当执行flip()的时候,会将position的值赋值给limit,将position指向缓冲区的第一个位置。当执行write()的时候,会将position移动到limit的位置。
金手指:flip()和wrap()就是为打印System.out.println或者write做准备。
2.1.2 Demo1_Random_Print
public class Demo1_Random_Print {
// 这段程序的意义:仅仅演示buffer的使用 除了boolean之外,7中基本类型都有buffer
public static void main(String[] args) {
//创建一个Buffer对象,缓冲区的大小为8
IntBuffer buffer = IntBuffer.allocate(8);
//随机生成8个int类型的数字并放入到缓冲区中去
for (int i = 0; i < buffer.capacity(); i++) {
int nextInt = new SecureRandom().nextInt(20);
System.out.print(nextInt+" "); // 这里打印和后面打印是一样的
buffer.put(nextInt);
}
System.out.println();
//直接打印,打印不出来一个元素,因为position到最后面的后面去了
while (buffer.hasRemaining()) {
System.out.print(buffer.get()+" "); // 这里打印和后面打印是一样的
}
System.out.println();
//执行翻转 所谓的翻转,就是让position到前面去,放到0的位置,limit放到position的位置
buffer.flip();
//读出缓冲区的中的所有的数据
while (buffer.hasRemaining()) {
System.out.print(buffer.get()+" "); // 这里打印和后面打印是一样的
}
}
}
金手指:
这段程序的意义:仅仅演示buffer的使用,随机生成,然后put,然后get,然后打印System.out.println 除了boolean之外,7中基本类型都有buffer
2.2 Demo2_Read_Print
public class Demo2_Read_Print {
// public static void main(String[] args) throws Exception {
// FileInputStream fileInputStream = new FileInputStream("demo2.txt");
// FileChannel channel = fileInputStream.getChannel();
// ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// //将从文件中读取出来的内容放到byteBuffer中去。
// channel.read(byteBuffer);
// //然后进行翻转
// byteBuffer.flip();
// //return limit - position; good
// while (byteBuffer.remaining() > 0) {
// System.out.print((char) byteBuffer.get()+" ");
// }
// fileInputStream.close();
// }
// 实际上不需要缓存也是可以的 stream+bufferArray
// public static void main(String[] args) throws Exception{
// FileInputStream fileInputStream = new FileInputStream(new File("demo2_stream_bufferArray.txt"));
//
// byte[] buffer = new byte[1024];
// int len = -1;
// while ((len = fileInputStream.read(buffer)) != -1) {
// String string = new String(buffer, 0, len); // 每次读入一个缓冲大小然后输出
// System.out.println(string);
// }
//
//
// fileInputStream.close();
// }
// 实际上可以bufferStream
public static void main(String[] args) throws Exception{
InputStream fileInputStream =new BufferedInputStream( new FileInputStream(new File("demo2_onlyBufferStream.txt")));
int len = -1;
while ((len = fileInputStream.read()) != -1) {
System.out.print(len);
}
fileInputStream.close();
}
}
2.3 Demo3_bytes_Write
public class Demo3_bytes_Write {
// public static void main(String[] args) throws Exception{
// FileOutputStream fileOutputStream = new FileOutputStream("demo3.txt");
// FileChannel channel = fileOutputStream.getChannel(); //这里使用Channel Write stream联系文件 channel联系stream 这里没有selector
// byte[] bytes = "NIO write".getBytes();
// ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// //将内容放在缓冲区中
// byteBuffer.put(bytes);
// //翻转
// byteBuffer.flip();
// //然后将缓冲区写到channel中
// channel.write(byteBuffer);
// fileOutputStream.close();
// }
// 实际上不需要缓存也是可以的 stream
// public static void main(String[] args) throws Exception{
// FileOutputStream fileOutputStream = new FileOutputStream(new File("demo3_onlyStream.txt"));
// fileOutputStream.write("bio write".getBytes());
// fileOutputStream.close();
// }
// 实际上可以bufferStream
// public static void main(String[] args) throws Exception{
// OutputStream fileOutputStream =new BufferedOutputStream( new FileOutputStream(new File("demo3_onlyBufferStream.txt")));
// fileOutputStream.write("bio write".getBytes());
// fileOutputStream.close();
// }
}
2.4 Demo4_Read_Write
public class Demo4_Read_Write {
// 这段程序的意义:
// 1、Channel+Buffer == Stream Selector就用来用注册channel的
// 2、stream联系文件 channel联系stream 这里没有selector
// 3、buffer同时用于读缓存和写缓存 channelRead + buffer channelWrite + buffer
// 4、这是使用nio操作读写,实际上使用bio也是可以读写文件的,只是为下面Reactor三种模式铺垫,这才是这个程序的实际意义,为后面Reactor三种模式铺垫
public static void main(String[] args) throws Exception{
FileInputStream fileInputStream=new FileInputStream("demo4read.txt"); // 读进来 这里使用Stream流
FileOutputStream fileOutputStream = new FileOutputStream("demo4write.txt"); // 写出去 这里使用Stream流
FileChannel channelRead = fileInputStream.getChannel(); //这里使用Channel Read stream联系文件 channel联系stream 这里没有selector
FileChannel channelWrite = fileOutputStream.getChannel(); //这里使用Channel Write stream联系文件 channel联系stream 这里没有selector
ByteBuffer byteBuffer = ByteBuffer.allocate(100); // buffer同时用于读缓存和写缓存 channelRead + buffer channelWrite + buffer
while (true) { // 循环,跳出条件是if (-1 == readNumber),没有读到内容
byteBuffer.clear(); // 每次循环一定要清空,否则就有上一个了
System.out.println(byteBuffer.position()); // 打印一下位置
//将dome4read.txt内容读取到缓冲区
int readNumber = channelRead.read(byteBuffer); // channel读入,对于buffer是写出
System.out.print(readNumber+" - "); // 输出读到的内容
if (-1 == readNumber) { // 没有就结束 最后输出-1结尾
break;
}
//翻转 让position从0开始,打印limit-position
byteBuffer.flip();
//将读取到内容写入到dome4write.txt中去
channelWrite.write(byteBuffer); // channel写出,对于读入
}
fileOutputStream.close(); // 关闭两个流stream
fileInputStream.close();
}
}
// 这段程序的意义:
// 1、Channel+Buffer == Stream Selector就用来用注册channel的
// 2、stream联系文件 channel联系stream 这里没有selector
// 3、buffer同时用于读缓存和写缓存 channelRead + buffer channelWrite + buffer
// 4、这是使用nio操作读写,实际上使用bio也是可以读写文件的,只是为下面Reactor三种模式铺垫,这才是这个程序的实际意义,为后面Reactor三种模式铺垫
2.5 Demo5
public class Demo5 {
//这段程序的意义:一个buffer可以存放不同数据类型,仅仅是bytebuffer的特性,知道就好,与nio无关
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(100);
buffer.putChar('a');
buffer.putInt(2);
buffer.putLong(50000L);
buffer.putShort((short) 2);
buffer.putDouble(12.4);
System.out.println(buffer.position()); //这段程序的意义:一个buffer可以存放不同数据类型
buffer.flip();
System.out.println(buffer.getChar());
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getShort());
System.out.println(buffer.getDouble());
}
}
金手指:
//这段程序的意义:一个buffer可以存放不同数据类型,仅仅是bytebuffer的特性,知道就好,与nio无关
2.6 Demo6
public class Demo6 {
// 该程序意义:bytebuffer和json一样,修改了子部分,父部分也会相应修改,仅仅是bytebuffer的特性,知道就好,与nio无关
public static void main(String[] args) {
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
for (int i = 0; i < byteBuffer.capacity(); i++) {
byteBuffer.put((byte) i); //放10个数字进去 放进去的是byte
}
byteBuffer.position(2);// 设置position
byteBuffer.limit(8); // 设置limit
//按照指定的position和limit的值创建Buffer
ByteBuffer resetBuffer = byteBuffer.slice(); // 得到resetbuffer
for (int i = 0; i < resetBuffer.capacity(); i++) { // 遍历resetbuffer
byte anInt = resetBuffer.get(); // 每次get 因为放进去的是byte 所以得到的类型使用byte接收
resetBuffer.put(i, (byte) (anInt * 2)); // 放到resetBuffer里面去,resetBuffer是一个bytebuffer value *2 要强转为byte类型
}
byteBuffer.position(0); // 设置bytebuffer position 0
byteBuffer.limit(byteBuffer.capacity()); // 设置bytebuffer limit
//最后遍历原来的Buffer发现也变了
while (byteBuffer.hasRemaining()) {
System.out.print(byteBuffer.get()+ " "); // byteBuffer值 2-7修改了 0 1 4 6 8 10 12 14 8 9
}
}
}
金手指:
// 该程序意义:bytebuffer和json一样,修改了子部分,父部分也会相应修改,仅仅是bytebuffer的特性,知道就好,与nio无关
2.7 Demo7
public class Demo7 {
// 其中HeapByteBuffer是堆内内存(JVM上面开辟的内存)。MappdeByteBuffer是堆外内存,不在JVM上开辟的内存。
// 这个程序意义:
public static void main(String[] args) {
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
for (int i = 0; i < byteBuffer.capacity(); i++) {
byteBuffer.put((byte) i); //放置10个数据
}
//生成一个只读的ByteBuffer bytebuffer变为readonlybuffer
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
System.out.println(byteBuffer.getClass()); // 打印字节码 class java.nio.HeapByteBuffer 堆内
System.out.println(readOnlyBuffer.getClass()); // 打印字节码 class java.nio.HeapByteBufferR 堆内 R表示OnlyRead
readOnlyBuffer.flip(); // readonlybuffer 准备打印
System.out.println(byteBuffer.position()); // byteBuffer poistion 没有flip 10
System.out.println(readOnlyBuffer.position()); // readonlybuffer position flip 0
for (int i = 0; i < readOnlyBuffer.capacity(); i++) {
System.out.print(readOnlyBuffer.get()+" "); // 打印 0 1 2 3 4 5 6 7 8 9
}
}
}
金手指:
// 其中HeapByteBuffer是堆内内存(JVM上面开辟的内存)。MappdeByteBuffer是堆外内存,不在JVM上开辟的内存。
2.8 Demo8_Read_Write_DirectMemory(与Demo4对应,一个堆上,一个堆外)
public class Demo8_Read_Write_DirectMemory {
// 这段程序的意义:和demo4一样,唯一区别,之前是allocate 这次是allocateDirect 就是直接内存中
public static void main(String[] args) throws Exception {
FileOutputStream fileOutputStream=new FileOutputStream("demo8write.txt");
FileInputStream fileInputStream=new FileInputStream("demo8read.txt");
FileChannel channelRead = fileInputStream.getChannel();
FileChannel channelWrite = fileOutputStream.getChannel();
//生成的堆外内存
ByteBuffer byteBuffer= ByteBuffer.allocateDirect(100); // 之前是allocate 这次是allocateDirect 就是直接内存中
while (true){
byteBuffer.clear(); // 每次循环清空
int readNumber = channelRead.read(byteBuffer); // read
System.out.print(readNumber+" "); // 打印 -1 结尾 和Demo4一样,只是堆外
if(readNumber==-1){
break;
}
byteBuffer.flip();
channelWrite.write(byteBuffer);
}
fileOutputStream.close();
fileInputStream.close();
}
}
金手指:
// 这段程序的意义:和demo4一样,唯一区别,之前是allocate 这次是allocateDirect 就是直接内存中
2.9 Demo9
public class Demo9 {
// 实际意义:bytebuffer和json一样,修改了子,父会变
public static void main(String[] args) {
byte[] bytes = new byte[]{'a', 'b', 'c'}; //bytes数组
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); // 根据bytes数组得到bytebuffer
//原来字节数组值变了,字节缓冲区中的值也会变了
bytes[0] = 'b'; // 修改bytes数组
byteBuffer.put(2, (byte) 'b'); // 修改bytebuffer
for (int i = 0; i < byteBuffer.capacity(); i++) {
System.out.print((char) byteBuffer.get()+" ");
}
}
}
金手指:
// 实际意义:bytebuffer和json一样,修改了子,父会变
三、NIO(重要)
public class NioClient {
public static void main(String[] args) throws Exception {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置成非阻塞
socketChannel.configureBlocking(false);
//提供服务器Ip和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress(9091);
//连接服务器端
if (!socketChannel.connect(inetSocketAddress)) { //如果连接不上
while (!socketChannel.finishConnect()){
System.out.println("Nio 非阻塞");
}
}
new Thread(new MyRunnable(socketChannel)).start();
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
byte[] array = new byte[buffer.position()];
if (array.length >= 0) System.arraycopy(buffer.array(), 0, array, 0, array.length);
if (read > 0) {
System.out.println(new String(array));
}
}
}
static class MyRunnable implements Runnable{
SocketChannel socketChannel;
public MyRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
while (true) {
//创建一个Buffer对象并存入数据
Scanner scanner = new Scanner(System.in);
String message = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
//发送数据
try {
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class NioServer {
public static void main(String[] args) throws Exception {
//得到serverSocketChannel对象
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到Selector对象
Selector selector = Selector.open();
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(9091));
//设置为非阻塞式
serverSocketChannel.configureBlocking(false);
//把ServerSocketChannel注册给Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//监听连接
while (true) {
if (selector.select(2000) == 0) { // selector轮询
//System.out.println("2秒内没有客户端来连接我");
continue;
}
//得到SelectionKey对象,判断事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isAcceptable()) { //连接事件
System.out.println("有人来连接");
//获取网络通道
SocketChannel clientSocket = serverSocketChannel.accept();
//设置非阻塞式
clientSocket.configureBlocking(false);
//连接上了 注册读取事件
clientSocket.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (selectionKey.isReadable()) {//读取客户端数据事件
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
socketChannel.read(byteBuffer); // 读入
// 逻辑 关于array开始
byte[] array = new byte[byteBuffer.position()]; // 新建array
if (array.length >= 0)
System.arraycopy(byteBuffer.array(), 0, array, 0, array.length); // 复制元素放到array中
System.out.println(new String(array)); // 打印array
// 逻辑 关于array结束
byteBuffer.put("你好".getBytes()); // put放入
byteBuffer.flip(); // 翻转,写出或打印的前奏
socketChannel.write(byteBuffer); // 写出
//写完过后要记得clear一下,不然position的值和limit的是一样,下次就会报异常。
byteBuffer.clear(); // 这个buffer同时用于读写,每次循环要清空
}
//手动从当前集合将本次运行完的对象删除
selectionKeys.remove(selectionKey); // iterator或者foreach需要remove
}
}
}
}
金手指:重点是NIOServer,NIOClient和IOClient是一样的,甚至可以不需要IOClient,直接用命令行模拟,对于NIOServer,后面的单线程Reactor就是对其封装并分类。
四、基于NIO的Reactor单线程模式(重要)
public class Main {
// TcpReactor 多线程 为了
// Acceptor 接收连接请求
// TcpHandler read接收请求 write写回给客户端
public static void main(String[] args) {
TcpReactor reactor = null;
try {
reactor = new TcpReactor(1333);
reactor.run(); // 为什么设置里使用run 不是start
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class TcpReactor implements Runnable {
private final ServerSocketChannel serverSocketChannel; // 类变量
// 三要素都要是类变量 selector--channnel--buffer(读写才要buffer,连接不需要buffer)
private final Selector selector; // 类变量
public TcpReactor(int port) throws Exception { // Reactor和之前的nio一样,就是对其封装一次
// 初始五句话:初始化selector 初始化服务端channel 服务端channel绑定端口 服务端channel设置为非阻塞 服务端channel注册到selector上面,返回在selector中表示这个服务端channel的key
// 最后一句:服务端这个channel去绑定一个服务端acceptor对象,用来接收请求的,参数为服务端channel和服务端selector
selector = Selector.open(); // good 这一步不能少 初始化selector 直接静态方法新建
serverSocketChannel = ServerSocketChannel.open(); // 初始化服务端channel 直接静态方法新建
//在ServerSocketChannel绑定监听端口
serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 服务端channel绑定端口
//设置ServerSocketChannel为非阻塞
serverSocketChannel.configureBlocking(false); // 服务端channel设置为非阻塞
// ServerSocketChannel向selector注册一个OP_ACCEPT事件,然后返回该通道的key SelectionKey.OP_ACCEPT 16
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 服务端channel注册到selector上面,返回在selector中表示这个服务端channel的key
//给定key一个附件的Acceptor对象
selectionKey.attach(new Acceptor(serverSocketChannel, selector)); // 服务端这个channel去绑定一个服务端acceptor对象,用来接收请求的,参数为服务端channel和服务端selector
}
@Override
public void run() {
//在线程被中断前持续运行
while (!Thread.interrupted()) { // 当服务端线程没有被打断的时候
System.out.println("Waiting for new event on port:" + serverSocketChannel.socket().getLocalPort() + "..."); // 在服务端端口号等待客户端连接
int len=-1;
//这一句打印表示服务端已经启动,或已经处理完成一次请求,服务端目前处于空闲状态
try {
//若没有事件就绪则不往下执行,NIO的底层是linux非阻塞io,轮询 这里没有必须阻塞
if ((len = selector.select()) == 0) { // 这里启动的时候阻塞,连接客户端才通过,没有客户端连接,这里就是0,也可以通过唤醒来处理
continue; // 既然方法还没执行完就阻塞,就完成不会返回0,那么还需要continue;干什么
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(len); // 这里我想打印一下
//取得所有已就绪事件的key集合
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 当前类变量selector中的key集合 selectedKey 被选择的指定的key Accept read write
Iterator<SelectionKey> it = selectionKeys.iterator(); // 迭代iterator() while
while (it.hasNext()) {
//根据事件的key进行调度
dispatch(it.next()); //分发
it.remove(); // 每次删除一个,和Reactor之前的nio一样,就是对其封装一次
}
}
}
//调度方法,根据事件绑定的对象开新线程
private void dispatch(SelectionKey key) {
//根据事件之key绑定的对象开启线程
Runnable r = (Runnable) key.attachment(); // 三个绑定 服务端启动的使用绑定Accept 客户端连接的时候使用
// 客户端连接的时候绑定READ 客户端输入的时候使用
// 客户端输入的时候绑定WRITE 输出给客户端的时候调用
// 输出给客户端的时候绑定READ 下一次客户端输入的时候使用
// 得到传递多来的selector中的key,所attach绑定的Runnable对象,就是Acceptor implements Runnable
if (r != null) {
r.run(); // 调用Acceptor的run()方法 调用
}
}
}
//接受连接请求线程 Reactor就是对NIO封装,按照该功能分为不同类,对于开发者来说,只要按照Main类中的,新建TcpReactor对象,传入ip:port,调用run()方法就好了,底层就是一个nio good
public class Acceptor implements Runnable {
private final ServerSocketChannel serverSocketChannel; // 两个类变量,都是被TcpReactor的类变量赋值(TcpReactor类的构造函数)
private final Selector selector; // 两个类变量,都是被TcpReactor的类变量赋值(TcpReactor类的构造函数)
public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel; // 仅仅设置而已,acceptor的run什么时候调用
this.selector = selector;
}
@Override
public void run() {
try {
//接受client连接请求
SocketChannel socketChannel = serverSocketChannel.accept(); // 既然调用就接收请求,
System.out.println(socketChannel.getRemoteAddress().toString() + " is connected."); // 这一句作用是表示启动服务端之后,
// serverSocketChannel.accept(); 是一句阻塞,阻塞在这里不会在运行下去, 一定要客户端连接到了这里才会打印这一句
// 这个阻塞和selector.wakeup()有什么关系:
// 这个服务端channel的阻塞,这个阻塞必须由客户端的连接来打开,但是服务端channel是注册到selector上面的,selector.wakeup()是唤醒这个阻塞吗
// 复制一个Client,启动两个就知道了,看第二能否连接上
if (socketChannel != null) { // 不为空
//设置成非阻塞 good
socketChannel.configureBlocking(false);
//SocketChannel向selector注册一个OP_READ事件,然后返回该通道的key good
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); //注册读 因为服务端是先读后写
//使一个阻塞住的selector操作立即返回
// selector.wakeup(); //wakeup 唤醒selector中的其中一个,为什么要唤醒,在哪里阻塞,
// selector.wakeup主要是为了唤醒阻塞在selector.select上的线程,在selector.select()后线程会阻塞
//给定可以一个附加的TCPHandler对象
selectionKey.attach(new TcpHandler(selectionKey, socketChannel)); // 这个key attach一个TcpHandler
// 传入两个参数 selectionKey是客户端channel注册到selector的时候获取到,客户端channel是连接得到的
// Acceptor建立连接,TcpHandler就要处理读写请求了
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class TcpHandler implements Runnable { // 也是一个多线程类
private final SelectionKey selectionKey; // 两个类变量,是在Accetpor的run方法中设置的,
// 传入两个参数 selectionKey是客户端channel注册到selector的时候获取到,客户端channel是连接得到的
private final SocketChannel socketChannel;
int state;
public TcpHandler(SelectionKey selectionKey, SocketChannel socketChannel) {
this.selectionKey = selectionKey;
this.socketChannel = socketChannel;
//初始的状态设定为Reading
state = 0; // state表示read还是send run用来判断 先读取客户端的数据,
// 然后三步一体:将标志位修改为state=1,注册写事件,唤醒一个selector,执行run()方法,将数据写到客户端
}
@Override
public void run() { // 这个run()方法什么时候调用 知道了 逻辑就是读写
try {
if (state == 0) {
//读取网络数据
read(); //read send 向对应
} else {
//发送网络数据
send(); // read send 相对应
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("[Warning!] A client has been closed");
closeChannel(); // 发生错误,关闭连接
}
}
private void closeChannel() {
try {
selectionKey.cancel(); //
socketChannel.close(); // selectionKey是客户端channel注册到selector的时候获取到,客户端channel是连接得到的
} catch (IOException e) {
e.printStackTrace();
}
}
private void send() throws Exception {
String str = "Your message has sent to" + socketChannel.socket().getLocalSocketAddress().toString() + "\r\n"; //这是服务端要发送给客户端的响应
//wrap自动把buf的position设为0,所以不需要在flip() good wrap和flip 就是要打印或者write
ByteBuffer buf = ByteBuffer.wrap(str.getBytes());
while (buf.hasRemaining()) {
//回传给client回应字符串,发送buf的position位置到limit位置为止之间的内容
socketChannel.write(buf); // 服务端使用channell来写,客户端使用两个stream读写,因为stream是单向的,
// 服务端使用一个channel读写,因为channel是双向的,api层面既有channel.read,也有channel.write
// 而且每次都是channel.write(buffer) channel.read(buffer) 所以 channel+buffer==stream selector是类变量,服务端全局选择
// 服务端的channel和stream如何联系起来 不用联系,直接静态方法新建channel
// 为什么分为服务端channel和客户端channel
// 服务端channel TcpReactor构造函数中,serverSocketChannel = ServerSocketChannel.open();
// 客户端channel Acceptor类中的run()方法,SocketChannel socketChannel = serverSocketChannel.accept(); // 启动服务端后阻塞在这里
// **服务端channel是打开来给客户端连接的,客户端channel表示服务端获取到的一个客户端连接,表示服务端获取到的一个客户端连接** 理解了 很重要 good
// 所以,服务端channel是open 客户端channel是accept() 源码命名优美
}
// 写完成数据到客户端后,然后三步一体:将标志位修改为state=0,注册读事件,唤醒一个selector,执行run()方法,读取客户端的数据,就是一个循环
//改变状态
state = 0; // 改变状态为0,继续读取客户端的数据
//通过key改变通道注册的事件 SelectionKey.OP_READ 1
selectionKey.interestOps(SelectionKey.OP_READ); // 类变量selectionKey的感兴趣的操作设置为read 下一次使用 TCPHandlers selectedkey就是read
//使一个阻塞住的selector操作立即返回
// selectionKey.selector().wakeup(); // 类变量selectionkey 得到这个key关联的selector,只有一个类变量selector,然后唤醒这个selector
// 调用wakeup()的地方,read send
}
private void read() throws Exception {
//non-blocking下不可用Readers,因为Readers不支持non-blocking
byte[] arr = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(arr);
//读取字符串
int numBytes = socketChannel.read(buffer); // 服务端使用channel来读取,
if (numBytes == -1) {
System.out.println("[Warning!] A client has been closed.");
closeChannel(); // 如果读完了,关闭连接,注意不是客户端发送-1,而是客户端发送exit
// 因为客户端发送exit会导致出现client.close,这个时候服务端可以感应到
return;
}
//将读取到的byte内容转为字符串类型
String str = new String(arr);
if ((str != null) && !str.equals(" ")) {
//模拟逻辑处理
process(); // 模拟逻辑处理 good
System.out.println(socketChannel.socket().getRemoteSocketAddress().toString() + ">" + str); // 打印客户端ip:port 及其 数据
// 然后三步一体:将标志位修改为state=1,注册写事件,唤醒一个selector,执行run()方法,将数据写到客户端
//改变状态
state = 1; // 已经读取完毕,就要修改状态,将将会结果发送给客户端
//通过key改变通道注册的事件 SelectionKey.OP_WRITE 4
selectionKey.interestOps(SelectionKey.OP_WRITE); // 类变量selectionKey的感兴趣的操作设置为write 下一次使用 TCPHandlers selectedkey就是write
//使一个阻塞住的selector操作立即返回
// selectionKey.selector().wakeup(); // 类变量selectionkey 得到这个key关联的selector,只有一个类变量selector,然后唤醒这个selector
}
}
void process() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Reactor单线程金手指:
Main --> TcpReactor–> Acceptor --> TcpHandler
对于Main类:开发者只需要传入端口就可以初始化服务端TcpReactor类,然后调用TcpReactor类的run()方法在main线程中启动
这里的TcpReactor Acceptor TcpHandler 都不是多线程,都是调用run(),不要被骗了 // run()方法:
// 是在主线程中执行方法,和调用普通方法一样;(按顺序执行,同步执行)
// start()方法:
// 是创建了新的线程,在新的线程中执行;(异步执行)
Reactor就是对NIO封装,按照该功能分为不同类,对于开发者来说,只要按照Main类中的,新建TcpReactor对象,传入ip:port,调用run()方法就好了,底层就是一个nio good
// TcpReactor 用于客户端连接和分发(acceptor和read/write)
// Acceptor accept接收连接请求
// TcpHandler read接收请求 write写回给客户端
对于TcpReactor类:
变量:selector ServerSocketChannel
// 三要素都要是类变量 selector–channnel–buffer(读写才要buffer,连接不需要buffer)
构造函数:
// 初始五句话:初始化selector 初始化服务端channel 服务端channel绑定端口 服务端channel设置为非阻塞 服务端channel注册到selector上面,返回在selector中表示这个服务端channel的key
// 最后一句:服务端这个channel去绑定一个服务端acceptor对象,用来接收请求的,参数为服务端channel和服务端selector
TcpReactor类的run()方法
// 当服务端线程没有被打断的时候
selector.select() //若没有事件就绪则不往下执行,NIO的底层是linux非阻塞io,轮询
//取得所有已就绪事件的key集合
// 当前类变量selector中的key集合 selectedKey 被选择的指定的key Accept read write
// 迭代iterator() while
//根据事件的key进行分发调度
// 每次删除一个,和Reactor之前的nio一样,就是对其封装一次
TcpReactor类的dispatch()方法
// 三个绑定 TcpReactor类:服务端启动的使用绑定Accept,客户端连接的时候使用
// Accept类:客户端连接的时候绑定READ,客户端输入的时候使用
// TcpHandler类:客户端输入的时候切换为WRITE,输出给客户端的时候调用
// TcpHandler类:输出给客户端的时候切换为READ,下一次客户端输入的时候使用
// 得到传递多来的selector中的key,所attach绑定的Runnable对象,就是Acceptor implements Runnable
对于Accept类:
// 两个类变量,都是被TcpReactor的类变量赋值(TcpReactor类的构造函数)
对于TcpHandler类:
TcpHandler类的send()方法
// 服务端使用一个channel读写,因为channel是双向的,api层面既有channel.read,也有channel.write
// 而且每次都是channel.write(buffer) channel.read(buffer) 所以 channel+buffer==stream selector是类变量,服务端全局选择
// 服务端的channel和stream如何联系起来 不用联系,直接静态方法新建channel
// 为什么分为服务端channel和客户端channel
// 服务端channel TcpReactor构造函数中,serverSocketChannel = ServerSocketChannel.open();
// 客户端channel Acceptor类中的run()方法,SocketChannel socketChannel = serverSocketChannel.accept(); // 启动服务端后阻塞在这里
// 服务端channel是打开来给客户端连接的,客户端channel表示服务端获取到的一个客户端连接,表示服务端获取到的一个客户端连接,重要,写两遍 理解了 很重要 good
// 所以,服务端channel是open 客户端channel是accept() 源码命名优美
TcpHandler类的read()方法
// 如果读完了,关闭连接,注意不是客户端发送-1,而是客户端发送exit
// 因为客户端发送exit会导致出现client.close,这个时候服务端可以感应到
五、基于NIO的Reactor多线程模式(重要)
HandlerState三个实现类,WorkState用来修改状态,ReadState用来完成服务端读出客户端传输的数据的操作,WriteState用来完成服务端写数据到客户端操作,将HandlerState类型的引用注册到TCPHandler类里面,其run()方法中直接调用
state.handle(this, sk, sc, pool);
这个模型中,TCPReactor单线程,Acceptor和TCPHandler里面,虽然实现Runnable接口,但是都只是在TCPReactor类中调用它们的run()方法,还是在main线程中执行其run()方法,但是TCPHandler里面有一个线程池,state.handle(this, sk, sc, pool); 表示读写操作是线程池中完成,这是这个模式的关键。
正好符合Reactor线程池
(1)服务端用于接收客户端连接的是个 1 个单独的 NIO 线程,然后,一组 NIO 线程上面完成所有的 IO 操作。
(2)Reactor 多线程模型有专门一个NIO 线程——Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求; 网络 IO 操作-读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。当目前为止,还是没有用到synchronized同步
六、Reactor主从多线程模式(重要)
金手指:Reactor主从线程池
含义:服务端用于接收客户端连接的是一个独立的 NIO 线程池,而且由一组 NIO 线程上面完成所有的 IO 操作。
Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责SocketChannel 的读写和编解码工作。Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO 操作。
一定要搞懂Reactor主从线程模式,这个SubReactor线程池是干什么用的?
第零个good 基本概念(三种Reactor模式通用)
Reactor:把IO事件分配给对应的handler处理
Acceptor:处理客户端连接事件
Handler:处理非阻塞的任务
第四个good
单线程Reactor:
Reactor:把IO事件分配给对应的handler处理
Acceptor:处理客户端连接事件(单线程处理连接事件,Acceptor实现Runnable接口忽略)
Handler:处理非阻塞的任务(单线程处理读写事件,Handler实现Runnable接口忽略)
多线程Reactor:
Reactor:把IO事件分配给对应的handler处理
Acceptor:处理客户端连接事件(单线程处理连接事件,Acceptor实现Runnable接口忽略)
Handler:处理非阻塞的任务(Handler类中线程池处理读写事件,Handler实现Runnable接口忽略)
主从多线程Reactor:
Reactor:把IO事件分配给对应的handler处理
Acceptor:处理客户端连接事件(Acceptor类中线程池处理连接事件,Acceptor实现Runnable接口忽略)
Handler:处理非阻塞的任务(Handler类中线程池处理读写事件,Handler实现Runnable接口忽略)
梳理下基于主从Reactor多线程模型的事件处理过程:
Reactor主线程对象通过select监听连接事件,通过Acceptor处理连接事件
第一个good:当Acceptor处理连接事件后,主reactor将连接分配给从Reactor
从Reactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
当有新事件发生时,从reactor就会对用对应的handler处理
handler读取数据后,分发给后面的worker线程处理
worker线程池分配独立的worker线程进行处理并返回结果
handler收到结果后再讲结果返回给客户端
可以看到,在主从Reactor多线程模型中,父线程与子线程之间数据交互简单、责任明确,父线程只需接收新连接,后续的处理交给子线程完成即可;主从Reactor多线程模型中,Reactor线程拆分为mainReactor和subReactor两个部分,第二个good:mainReactor只处理连接事件,读写事件交给subReactor来处理。业务逻辑还是由线程池来处理,mainRactor只处理连接事件,用一个线程来处理就好。处理读写事件的subReactor个数一般和CPU数量相等,一个subReactor对应一个线程,业务逻辑由线程池处理
七、面试金手指(重要,面试语言组织,文字为主,当被问到IO的时候)
面试语言组织:(背下来,理解后背诵很简单,理解和防范面试官的问题)
三个要素
Reactor模型是基于事件驱动的线程模型,可以分为Reactor单线程模型、Reactor多线程模型、主从Reactor多线程模型,通常基于在I/O多路复用实现。不同的角色职责有:Dispatcher负责事件分发、Acceptor负责处理客户端连接、Handler处理非连接事件(例如:读写事件)。
一、Reactor单线程模型
1、原理图示
在Reactor单线程模型中,操作在同一个Reactor线程中完成。根据事件的不同类型,由Dispatcher将事件转发到不同的角色中处理。连接事件转发到Acceptor处理、读写事件转发到不同的Handler处理。
2、实现图示
NIO实现中,可以将Accept事件注册到select选择器中,轮询是否有“接受就绪”事件。如果为“连接就绪”分发给Acceptor角色处理;“写就绪”事件分发给负责写的Handler角色处理;“读就绪”事件分发给负责读的Handler角色处理。这是事情都在一个线程中处理。
二、Reactor多线程模型
1、原理图示
在Reactor多线程模型中。根据事件的不同类型,由Dispatcher将事件转发到不同的角色中处理。连接事件转发到Acceptor单线程处理、读写事件转发到不同的Handler由线程池处理。
2、实现图示
NIO实现中,可以将Accept事件注册到select选择器中,轮询是否有“接受就绪”事件。如果为“连接就绪”分发给Acceptor角色处理,此处处理“连接就绪”为一个线程;“写就绪”事件分发给负责写的Handler角色由线程池处理;“读就绪”事件分发给负责读的Handler角色由线程池处理。
三、主从Reactor多线程模型
1、原理图示
Reactor多线程模型,由Acceptor接受客户端连接请求后,创建SocketChannel注册到Main-Reactor线程池中某个线程的Select中;具体处理读写事件还是使用线程池处理(Sub-Reactor线程池)。
2、实现图示
将Accept事件注册到select选择器中,轮询是否有“接受就绪”事件;“连接就绪”分发给Acceptor角色处理,创建新的SocketChannel转发给Main-Reactor线程池中的某个线程处理;在指定的Main-Reactor某个线程中,将SocketChannel注册读写事件;当“写就绪/读就绪”事件分别由线程池(Sub-Reactor线程池)处理。
三要素
八、小结
从NIO到Reactor三种模式,完成了。
源工程代码:
https://download.csdn.net/download/qq_36963950/12713959
上一篇: PCL ——(4)点云文件读写
下一篇: 相机深度转点云公式原理