欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

深入学习Netty(2)——传统NIO编程

程序员文章站 2022-03-06 11:06:45
前言 学习Netty编程,避免不了从了解Java 的NIO编程开始,这样才能通过比较让我们对Netty有更深的了解,才能知道Netty大大的好处。传统的NIO编程code起来比较麻烦,甚至有遗留Bug,但其中最基本的思想是一致的。 参考资料《Netty In Action》、《Netty权威指南》( ......

前言

  学习netty编程,避免不了从了解java 的nio编程开始,这样才能通过比较让我们对netty有更深的了解,才能知道netty大大的好处。传统的nio编程code起来比较麻烦,甚至有遗留bug,但其中最基本的思想是一致的。

  参考资料《netty in action》、《netty权威指南》(有需要的小伙伴可以评论或者私信我)

  博文中所有的代码都已上传到github,欢迎star、fork

 


 

一、nio 核心组件

  nio,有人称之为new i/o,这是官方叫法。但是由于之前老的i/o类库是阻塞i/o,所以此时的nio也可以是非阻塞i/o(non-block i/o)

  与socket类和serversocket类相对应,nio提供了socketchannel和serversocketchannel不同的套接字通道实现,可以支持阻塞和非阻塞两种模式

  nio库是jdk 1.4中引入的,弥补了原来同步阻塞i/o的不足。这是因为提供了高速处理、面向块的i/o,主要包括:缓冲区buffer、通道channel、多路复用器selector

1.缓冲区buffer

  在nio库中,所有的数据都是缓冲区处理的,读取数据时直接读取缓冲区;在写入数据时,写入到缓冲区。在任何时候访问nio中的数据,都是通过缓冲区进行操作实际上缓冲区是一个数组,有不同类型的数组,通常是字节数组(bytebuffer),但它不仅仅是一个数组,缓冲区提供对数据的结构化访问以及维护读写位置(limit)等信息

  深入学习Netty(2)——传统NIO编程 

2.通道channel

  网络数据通过channel双向读取和写入(全双工),这点不同于stream(inputstream/outputstream或者其子类)一个方向上移动。

  channel可以分类两个大类:用于网络读写的selectablechannel和用于文件操作的filechannel

  serversocketchannel和socketchannel都是selectablechannel的子类。

  深入学习Netty(2)——传统NIO编程

3.多路复用器selector

  多路复用器提供选择已经就绪的任务的能力,具体来说:selector会不断地轮询注册在其上的channel,如果某个channel上面发生读写事件,就表明这个channel处于就绪状态,会被selector轮询出来,通过selectionkey可以获取就绪的channel的集合,进行后续的i/o操作。这样就意味着只需要一个线程负责selector轮询,就可以接入成千上万的客户端。

  多路复用器selector是最核心的组件,在netty编程中也是尤为重要的,但是这里不具体展开,到时候分析netty源码的时候会具体介绍。

二、nio服务端

1.服务端序列图

先放出如下的nio服务端序列图,结合序列图给具体的步骤如下,之后的示例代码中也会有详细注释

  深入学习Netty(2)——传统NIO编程

 

第一步:打开serversocketchannel,用于监听客户端的连接,是所有客户端连接的父管道。

第二步:绑定监听端口,设置连接为非阻塞模式

第三步:创建reactor线程,创建多路复用器并启动线程

第四步:将serversocketchannel注册到reactor线程的多路复用器selector上,监听accpet事件。

第五步:多路复用器在线程run方法在无线循环体内轮询准备就绪的key。

第六步:多路复用器监听到有新的客户端接入,处理新的接入请求,完成tcp三次握手,建立物理链路。

第七步:设置客户端链路为非阻塞模式

第八步:将新接入的客户端注册到reactor线程的多路复用器上,监听读操作,读取客户端发送的网络消息。

第九步:异步读取客户端请求消息到缓冲区

第十步:对bytebuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成task,交给业务线程池中,进行业务处理

第十一步:将对象encode成bytebuffer,调用socketchannel的异步write接口,将消息异步发送给客户端

2.服务端代码示例

(1)多路复用服务multiplexertimeserver

public class multiplexertimeserver implements runnable {

    private selector selector;

    private serversocketchannel servchannel;

    private volatile boolean stop;

    /**
     * 初始化多路复用器、绑定监听端口
     *
     * @param port
     */
    public multiplexertimeserver(int port) {
        try {
            // 1. 打开serversocketchannel,监听客户端连接
            servchannel = serversocketchannel.open();
            // 2. 绑定监听端口,设置连接为非阻塞模式
            servchannel.socket().bind(new inetsocketaddress(port), 1024);
            servchannel.configureblocking(false);
            // 3. 创建reactor线程,创建多路复用并启动线程
            selector = selector.open();
            // 4. 将serversocketchannel注册到reactor线程的多路了复用器selector,监听accept事件
            servchannel.register(selector, selectionkey.op_accept);
            system.out.println("the time server is start in port : " + port);
        } catch (ioexception e) {
            e.printstacktrace();
            system.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }


    @override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                set<selectionkey> selectedkeys = selector.selectedkeys();
                iterator<selectionkey> it = selectedkeys.iterator();
                selectionkey key = null;
                // 循环轮询准备就绪的key
                while (it.hasnext()) {
                    key = it.next();
                    it.remove();
                    try {
                        // deal with i/o event
                        handleinput(key);
                    } catch (exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (throwable t) {
                t.printstacktrace();
            }
        }
        // 多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (ioexception e) {
                e.printstacktrace();
            }
        }
    }

    private void handleinput(selectionkey key) throws ioexception {
        if (key.isvalid()) {
            // 处理新接入的请求消息
            if (key.isacceptable()) {
                // a connection was accepted by a serversocketchannel
                serversocketchannel ssc = (serversocketchannel) key.channel();
                // 6. 监听到新的客户端接入,处理新的接入请求我,完成tcp三次握手-->建立链路
                socketchannel sc = ssc.accept();
                // 7. 设置客户端链路为非阻塞模式
                sc.configureblocking(false);
                sc.socket().setreuseaddress(true);
                // 8. 将新接入的客户端连接注册到reactor线程的多路复用器上,监听读操作,读取客户端发送的消息
                sc.register(selector, selectionkey.op_read);
            }
            if (key.isreadable()) {
                // a channel is ready for reading
                socketchannel sc = (socketchannel) key.channel();
                bytebuffer readbuffer = bytebuffer.allocate(1024);
                // 9. 异步读取客户端请求消息到缓冲区
                int readbytes = sc.read(readbuffer);
                if (readbytes > 0) {
                    readbuffer.flip();
                    // 10. 读取解码报文
                    byte[] bytes = new byte[readbuffer.remaining()];
                    readbuffer.get(bytes);
                    string body = new string(bytes, "utf-8");
                    system.out.println("the time server receive order : " + body);
                    string currenttime = "query time order"
                            .equalsignorecase(body) ? new java.util.date(
                            system.currenttimemillis()).tostring()
                            : "bad order";
                    dowrite(sc, currenttime);
                } else if (readbytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节,忽略
                }
            }
        }
    }

    private void dowrite(socketchannel channel, string response)
            throws ioexception {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getbytes();
            bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
            writebuffer.put(bytes);
            writebuffer.flip();
            channel.write(writebuffer);
        }
    }
}

(2)nio服务timeserver

public class timeserver {

    public static void main(string[] args) {
        int port = 8084;
        multiplexertimeserver timeserver = new multiplexertimeserver(port);
        new thread(timeserver, "nio-timeserver").start();
    }
}

(3)开启服务端

运行timeserver:

深入学习Netty(2)——传统NIO编程

使用netstat命令查看是否对8084端口开启监听

深入学习Netty(2)——传统NIO编程

三、nio客户端

1.客户端序列图

深入学习Netty(2)——传统NIO编程

第一步:打开socketchannel,绑定客户端本地地址(可选,默认系统会随机会分配一个可用的本地地址)

第二步:设置socketchannel为非阻塞模式,同时设置客户端连接的tcp参数

第三步:异步连接服务端

第四步:判断是否连接成功,如果连接成功则直接注册读状态位到多路复用中。如果没有当前没有连接成功(异步连接,返回false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路还没建立)

第五步:向reactor线程的多路复用op_connect状态位,监听服务端的tcp ack应答

第六步:创建reactor线程,创建多路复用器并启动线程。

第七步:多路复用在线程run方法无线循环体内轮询准备就绪的key

第八步:接收connect事件进行处理

第九步:判断连接结果,如果连接成功,注册读事件到多路复用器,

第十步:注册读事件到多路复用器

第十一步:异步读客户端请求消息到缓冲区

第十二步:对bytebuffer进行编解码

第十三步:将pojo对象encode成bytebuffer,调用socketchannel的异步write接口,将消息异步发送给客户端。

2.客户端示例代码

(1)客户端处理timeclienthandle

public class timeclienthandle implements runnable {

    private string host;
    private int port;
    private selector selector;
    private socketchannel socketchannel;
    private volatile boolean stop;

    public timeclienthandle(string host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            // 创建多路复用器并打开
            selector = selector.open();
            // 1.打开socketchannel,
            socketchannel = socketchannel.open();
            // 2.设置socketchannel非阻塞模式, 这里不设置tcp参数
            socketchannel.configureblocking(false);
        } catch (ioexception e) {
            e.printstacktrace();
            system.exit(1);
        }
    }


    @override
    public void run() {
        try {
            // 连接服务端
            doconnect();
        } catch (ioexception e) {
            e.printstacktrace();
            system.exit(1);
        }
        while (!stop) {
            try {
                // 6. 多路复用器在线程run方法的无限循环体内轮询准备就绪的key
                selector.select(1000);
                set<selectionkey> selectedkeys = selector.selectedkeys();
                iterator<selectionkey> it = selectedkeys.iterator();
                selectionkey key = null;
                while (it.hasnext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleinput(key);
                    } catch (exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (exception e) {
                e.printstacktrace();
                system.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (ioexception e) {
                e.printstacktrace();
            }
        }
    }


    /**
     * 处理客户端输入
     *
     * @param key
     * @throws ioexception
     */
    private void handleinput(selectionkey key) throws ioexception {

        if (key.isvalid()) {
            // 判断是否连接成功
            socketchannel sc = (socketchannel) key.channel();
            // 7. 接收connect事件进行处理
            if (key.isconnectable()) {
                // 8. 如果连接完成则注册读事件到多路复用器
                if (sc.finishconnect()) {
                    sc.register(selector, selectionkey.op_read);
                    dowrite(sc);
                } else {
                    system.exit(1);// 连接失败,进程退出
                }
            }
            if (key.isreadable()) {
                bytebuffer readbuffer = bytebuffer.allocate(1024);
                // 9. 异步读客户端请求消息到缓冲区
                int readbytes = sc.read(readbuffer);
                if (readbytes > 0) {
                    readbuffer.flip();
                    byte[] bytes = new byte[readbuffer.remaining()];
                    readbuffer.get(bytes);
                    string body = new string(bytes, "utf-8");
                    system.out.println("now is : " + body);
                    this.stop = true;
                } else if (readbytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节,忽略
                }
            }
        }

    }

    private void doconnect() throws ioexception {
        // 3. 异步连接客户端
        boolean connected = socketchannel.connect(new inetsocketaddress(host, port));
        if (connected) {
            // 4. 返回true则直接连接成功,则注册到多路复用器上,发送请求消息,读应答
            socketchannel.register(selector, selectionkey.op_read);
            dowrite(socketchannel);
        } else {
            // 5. 如果返回false,则说明此时链路还没有建立,则注册op_connect状态位,监听服务端的tcp ack应答
            socketchannel.register(selector, selectionkey.op_connect);
        }
    }

    private void dowrite(socketchannel sc) throws ioexception {
        byte[] req = "query time order".getbytes();
        bytebuffer writebuffer = bytebuffer.allocate(req.length);
        writebuffer.put(req);
        writebuffer.flip();
        sc.write(writebuffer);
        if (!writebuffer.hasremaining()) {
            system.out.println("send order to server succeed.");
        }
    }
}

(2)nio客户端timeclient

public class timeclient {

    public static void main(string[] args) {
        int port = 8084;
        new thread(new timeclienthandle("127.0.0.1", port), "nio-timeclient").start();
    }
}

(3)运行客户端

运行timeclient:

深入学习Netty(2)——传统NIO编程

此时服务端console:

深入学习Netty(2)——传统NIO编程

四、nio编程的优点

1.nio编程的优势与缺点

(1)客户端发起的连接操作是异步的

  可以通过在多路复用器注册op_connect等待后续结果,不需要像之前的客户端被同步阻塞。

(2)socketchannel的读写操作都是异步的

  如果没有可读写数据不会等待直接返回,i/o通信线程就可以处理其他链路,不需要同步等待链路可用。

(3)线程模型的优化

  selector在linux等主流系统上是通过epoll实现,没有连接句柄的限制,意味着一个selector可以处理成千上万的客户端连接,而且性能不会降低

(4)同步非阻塞通信

  nio需要开启线程不断循环去获取操作结果,看起来不是很明智,真正有效的应该是基于异步回调获取结果的,jdk 1.7以后就提供了异步非堵塞的io操作方式,所以人们叫它 aio(asynchronous io),异步 io 是基于事件和回调机制实现的。

2.selector基本工作原理

  首先,需要将 channel 注册到 selector 中,这样 selector 才知道哪些 channel 是它需要管理的。之后,selector 会不断地轮询注册在其上的 channel 。如果某个 channel 上面发生了读或者写事件,这个 channel 就处于就绪状态,会被 selector 轮询出来,然后通过 selectionkey 可以获取就绪 channel 的集合,进行后续的 i/o 操作。

  关于selector操作的代码示例模板:

    // 创建 selector
    selector selector = selector.open();
    channel.configureblocking(false);
    // 注册 channel 到 selector 中
    selectionkey key = channel.register(selector, selectionkey.op_read);
    while(true) {
        // 通过 selector 选择 channel 
        int readychannels = selector.select();
        if (readychannels == 0) {
            continue;
        }
        // 获得可操作的 channel
        set selectedkeys = selector.selectedkeys();
        // 遍历 selectionkey 数组
        iterator<selectionkey> keyiterator = selectedkeys.iterator();
        while (keyiterator.hasnext()) {
            selectionkey key = keyiterator.next();
            if (key.isacceptable()) {
                // a connection was accepted by a serversocketchannel.
            } else if (key.isconnectable()) {
                // a connection was established with a remote server.
            } else if (key.isreadable()) {
                // a channel is ready for reading
            } else if (key.iswritable()) {
                // a channel is ready for writing
            }
            // 移除
            keyiterator.remove();
        }
    }