欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Reactor学习

程序员文章站 2022-05-22 20:34:46
...

Reactor

我们需要先了解下NIO的基础和概念。
NIO相对于BIO是非阻塞的API。NIO有三个核心组件分别是:
			Buffer缓冲区
			channel通道
			selector选择器

Buffer缓冲区
缓冲区的本质是一个可以写入数据的内存模块,可以再次读取。
使用Buffer进行数据写入和读取,有四个步骤:
1、将数据写入缓冲区,
2、调用buffer.filp(),转为读取模式
3、缓冲区读取数据
4、调用buffer.clear()或者buffer.compact()清除缓存区
channel通道
bio的管道是outStream和inputStream直接对对接。Nio是channel和channel对接。如下图:
Reactor学习
主要是socketchannel和serversocketcannel.
socketchannel用户tcp的网络连接。对应客户端而言是客户端主动发起和客户端的链接。对应服务端是服务的获取新的链接。
SocketChannel socketChannel =SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(“127.0.0.1”,9030));

//创建网络请求
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//设置非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(9030));//绑定端口
System.out.println(“启动成功”);

selector选择器
实现了单个线程可以管理多个通道,从而管理多个网络连接。
一个线程selector监听多个channel的不同事件:
1、connect连接
2、accept准备就绪
3、read读取
4、write写入

Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。
通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“ 连接就绪 ”。一个Server Socket Channel准备好接收新进入的连接称为“ 接收就绪 ”。一个有数据可读的通道可以说是“ 读就绪 ”。等待写数据的通道可以说是“ 写就绪 ”。

单线程模型Reactor
所有的IO操作都在同一个NIO线程上完成的。单线程NIO处理监听请求、接受读请求、处理业务逻辑、写请求等。
作为服务端的Reactor:
1、接受客户端的tcp请求
2、读取通信端的请求和应答信息
3、向通信端发送请求。
单reacotor的模型如下:

Reactor学习
Reactor 多线程模型
Reactor多线程模型与单线程模型最大的区别在于,IO处理线程不再是一个线程,而是一组NIO处理线程。原理如下图所示
Reactor学习

其特点:
1)有一个acceptor线程监听客户端 的tcp请求
2)读写IO线程,由专门的线程池负责读写。这些io读写的线程池负责读取、发送数据
3)一个acceptor线程可以处理N个链路,但是一个链路只有一个accptor线程

主从Reactor 多线程模型
主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO的线程池。Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证),再将新创建的SocketChannel注册到IO线程池(sub reactor)的某个IO处理线程上并处理编解码和读写工作。Acceptor线程池仅负责客户端的连接与认证,一旦链路连接成功,就将链路注册到后端的sub Reactor的IO线程池中。 线程模型图如下
Reactor学习


package com.wm.test.net.mynio;


import com.study.hc.net.nio.NIOServerV3;
import com.sun.org.apache.bcel.internal.generic.Select;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多路复用选择器 是selector
 *
 * reactor线程模型,一般是2种
 * 一种是单reactor模式
 *    reactor负责处理accept,和读、写就绪
 *    handle负责处理读数据和写数据, workspace工作线程负责处理业务
 *
 * 二种是多线程reator模式
 *   mainreactor是负责处理accept,读写就绪
 *   subreactor线程负责处理读、写数据
 *   workspace工作线程负责处理业务
 *
 *
 */

public class NIOServierV3{

    /** 处理业务操作的线程 */
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    /**只有1个serverchannel 和多个客户端client 连接**/
    private ServerSocketChannel serverSocketChannel;

    /**  1、创建多个线程 - accept处理reactor线程 (accept线程)**/
    private ReactorThread[] mainReactorThread = new ReactorThread[1];
    private ReactorThread[] subReactorThread= new ReactorThread[8];



    /** ReactorThread 线程,封装着,selector,accept和reader 以及 serverSocketChannel 和 SocketChannel
     * 注册监听事件中**/

    abstract  class ReactorThread extends Thread{
        Selector selector;//多路复用选择器
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();//单独注册channle用的

        //seletor 监听到事件后,调用这个方法
        public abstract void handle(SelectableChannel selectableChannel)throws Exception;

        //初始化选择器
        public ReactorThread() throws IOException {
            this.selector=Selector.open();
        }

        volatile  boolean running =false;

        private void doStart(){
            if(!running){
                running=true;
                super.start();//启动线程
            }
        }

        private SelectionKey regist(SelectableChannel selectableChannel) throws Exception {


           // selectableChannel.register(selector,0,selectableChannel);//频道注册到选择器上,模式是0

            // 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁

            // 代码里select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,就放到同一个线程来处理
            //必须等任务都注册完成后,才能调用select()

            //上面的方法改为由线程任务提交模式

            FutureTask<SelectionKey> task = new FutureTask<SelectionKey>(()-> selectableChannel.register(selector, 0, selectableChannel));
            taskQueue.add(task);
            return task.get();
        }




        @Override
        public void run() {
            while(running){

                Runnable runnable;

                while((runnable=taskQueue.poll())!=null){
                    runnable.run();
                }
                try {
                    selector.select(1000);
                    //取出所有的事件集合

                    Set<SelectionKey> selected  =   selector.selectedKeys();
                    Iterator<SelectionKey> iterator= selected.iterator();

                    while(iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        //key对应的时间不同的注册事件不一样
                        iterator.remove();
                        int readyOps = key.readyOps();

                        //public final boolean isReadable()
                        //测试此键的通道是否已准备好进行读取。
                        //调用此方法的形式为 k.isReadable() ,该调用与以下调用的作用完全相同:
                       /* if((readyOps & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ))!=0 || readyOps==0){



                        }*/
                        //上面的用户相同
                       if(key.isReadable() || key.isAcceptable() || readyOps==0){

                           SelectableChannel selectableChannel= (SelectableChannel) key.attachment();
                           selectableChannel.configureBlocking(false);

                           try {
                               handle(selectableChannel);
                               //如果是serverchannel则是需要accept获取socketchannel和 注册读事件
                                    //还有将serverchannel分配给subReactor去处理
                               //如果是socketchannel则需要直接读取数据,做业务逻辑处理

                               if(!selectableChannel.isOpen()){
                                   key.cancel(); // 如果关闭了,就取消这个KEY的订阅
                               }

                           } catch (Exception e) {
                               e.printStackTrace();
                               key.cancel(); // 如果有异常,就取消这个KEY的订阅
                           }
                       }


                    }
                    //while的外面 与 selector.select并列
                    selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }


            }

        }
    }


    /**
     * 绑定端口
     *
     * @throws IOException IO异常
     */
    private void bind() throws IOException {

        //  1、 正式绑定端口,对外服务
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("启动完成,端口8080");


    }


    /**
     * 初始化线程组
     */
    private void newMain和subReactorGroup() throws IOException {

        // 创建mainReactor线程, 只负责处理serverSocketChannel
        for (int i = 0; i < mainReactorThread.length; i++) {
            AtomicInteger incr = new AtomicInteger(0);
            mainReactorThread[i] = new ReactorThread() {
                @Override
                public void handle(SelectableChannel selectableChannel) throws Exception {
                    // 只做请求分发,不做具体的数据读取
                    ServerSocketChannel ch = (ServerSocketChannel) selectableChannel;
                    SocketChannel socketChannel= ch.accept();
                    socketChannel.configureBlocking(false);

                    // 收到连接建立的通知之后,分发给I/O线程继续去读取数据
                    int index = incr.getAndIncrement() % subReactorThread.length;
                    //ReactorThread workEventLoop = subReactorThread[index];

                    ReactorThread ioReactor=subReactorThread[index];
                    ioReactor.doStart();//io线程跑起来 和 io线程注册 是分开的,其实里面一个select()和rigitst里面都是用到锁的
                    //在regist用的是线程注册,futuretask是阻塞的,等dostart线程里面的taskqune任务run之后可以获取
                    SelectionKey selectionKey = ioReactor.regist(socketChannel);//是注册频道和channel、selector关联
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());

                }
            };


        }
        // 创建IO线程,负责处理客户端连接以后socketChannel的IO读写
        for (int i = 0; i < subReactorThread.length; i++) {
            subReactorThread[i]=new  ReactorThread(){


                @Override
                public void handle(SelectableChannel selectableChannel) throws Exception {
                    // work线程只负责处理IO处理,不处理accept事件
                    SocketChannel ch = (SocketChannel) selectableChannel;
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                        // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println(Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());

                    // TODO 业务操作 数据库、接口...
                    workPool.submit(() -> {
                    });

                    // 响应结果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        ch.write(buffer);
                    }
                }
            };

        }

    }

    /**
     * 初始化channel,并且启动一个eventLoop线程,
     * 服务端需要启动serversockert
     *
     * @throws IOException IO异常
     */
    private void initAndRegister() throws Exception {

        // 1、 创建ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 2、 将serverSocketChannel注册到selector
        int index = new Random().nextInt(mainReactorThread.length);
        mainReactorThread[index].doStart();
        SelectionKey selectionKey = mainReactorThread[index].regist(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }






}