欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty学习笔记之用NIO实现一个echo服务器

程序员文章站 2022-04-20 08:30:48
文章目录需求分析代码实战增加需求代码改进需求分析了解了NIO以及其组件,下面我要用NIO编程知识来实现一个echo服务器。所谓echo服务器,及客户端像给服务器发送了什么消息,服务器就发回什么消息。下面我们来尝试实现这个服务器。代码实战话不多说,先直接上代码。public class MainDemo { public static void main(String[] args) throws IOException { //创建socket通道对象,java.nio...

需求分析

了解了NIO以及其组件,下面我要用NIO编程知识来实现一个echo服务器。

所谓echo服务器,及客户端像给服务器发送了什么消息,服务器就发回什么消息。

下面我们来尝试实现这个服务器。

代码实战

话不多说,先直接上代码。

public class MainDemo1 {
    //处理拿到可读事件的socket
    static class clientProcessor implements Runnable{
        private Selector selector;
        public clientProcessor(Selector selector){
            this.selector = selector;
        }
        @Override
        public void run() {
            while (true){
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false)
                        {
                            continue;
                        }
                        if (key.isReadable())
                        {//代码①
                           ByteBuffer byteBuffer = ByteBuffer.allocate(16);
                           SocketChannel clientChannel  = (SocketChannel) key.channel();
                           int read = clientChannel.read(byteBuffer);
                           if (read == -1){
                               key.cancel();
                               clientChannel.close();
                           }else {
                               byteBuffer.flip();
                               byte[] bytes = new byte[byteBuffer.limit()];
                               byteBuffer.get(bytes);
                               System.out.println(bytes);
                           }
                           iterator.remove();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws IOException {
        //新建服务端管道对象并设置为非阻塞
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);
        //基于机器性能创建选择器数组
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        //初始化数组,依次启动线程
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new clientProcessor(selector)).start();
        }
        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        while (true){
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.register(selectors[id.getAndIncrement()%selectors.length], SelectionKey.OP_READ);
                iterator.remove();
            }
        }
    }
}


如上代码,已经可以初步实现我们所需要的服务器,分析一下代码。

先创建一个服务端管道,服务端管道注册选择器的接入事件,然后循环select等待客户端的接入。

客户端接入后,在自己的线程内,selector返回调用后产生的选择键集合,之后遍历集合,判断事件,当是接入事件时,拿到客户端通道,再将客户端通道注册为可读事件到选择器上,再次select,等待数据的准备。

当数据准备好后,selector再次返回调用后产生的选择键集合,遍历集合,这次是可读事件,判断为可读事件后,读取客户端通道数据,并回写给客户端。

这样我们就实现了一个简单的echo服务器,但是我们现在需要给这个服务器加点功能,让它能更好的实现对客户端信息的回写。

增加需求

我们规定我们的echo服务器的实现需要有以下特点:

  1. 服务器原样返回客户端发送的信息。
  2. 客户端发送的信息以’\r’作为一个消息的结尾,一个消息的最大长度不超过128。
  3. 客户端会一次发送多个消息,服务端需按顺序原样返回。

我们先来分析一下需求。

针对需求第二三,客户端发送的消息需要以“/r”作为消息的结尾。所以为了把服务器的消息区分开来,我们不能以定长来处理数据了。同时我们需要考虑一下tcp的拆包和粘包。

什么是tcp的拆包和粘包呢?tcp是面向流的协议,我们无法知道一个数据包的边界,所以在接收数据时,可能会因为一次数据包过大而分次填充到socket缓冲区,这就是tcp的拆包;而多个数据包一起读取并填充到socket的缓冲区中,便称为tcp的粘包。

有了以上分析,我们可以得到以下代码:

public class MainDemo2 {
    //处理拿到可读事件的socket
    static class clientProcessor implements Runnable {
        private Selector selector;
        public clientProcessor(Selector selector) {
            this.selector = selector;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false) {
                            continue;
                        }
                        if (key.isReadable()) {
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ByteBuffer readBuffer = (ByteBuffer) key.attachment();
                            int read = clientChannel.read(readBuffer);
                            if (read == -1) {
                                key.cancel();
                                clientChannel.close();
                            } else {
                                readBuffer.flip();
                                int position = readBuffer.position();
                                int limit = readBuffer.limit();
                                List<ByteBuffer> buffers = new ArrayList<>();
                                for (int i = position; i < limit; i++) {
                                    if (readBuffer.get(i) == '\r') ;
                                    {
                                        ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position() + 1);
                                        readBuffer.limit(i + 1);
                                        message.put(readBuffer);
                                        readBuffer.limit(limit);
                                        message.flip();
                                        buffers.add(message);
//                                        byte[] bytes = new byte[message.limit()];
//                                        message.get(bytes);
//                                        System.out.println(bytes);
                                    }
                                }
                                for (ByteBuffer message : buffers) {
                                //判断message是否有效
                                    while (message.hasRemaining()) {
                                        clientChannel.write(message);
                                    }
                                }
                                readBuffer.compact();
                            }

                        }
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws IOException {
        //新建服务端管道对象并设置为非阻塞
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //基于机器性能创建选择器数组
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        //初始化数组,依次启动线程
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new clientProcessor(selector)).start();
        }
        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                Selector selectorChild = selectors[id.getAndIncrement() % selectors.length];
                socketChannel.register(selectorChild, SelectionKey.OP_READ, ByteBuffer.allocate(256));
                selectorChild.wakeup();
                iterator.remove();
            }
        }
    }
}

如上代码,我们比之前的实现做了一些改动。

首先在注册可读事件到选择器时,我们带上了一个256长度的ByteBuffer。这个ByteBuffer专门服务于这个这个选择器。因为粘包和拆包的存在,一次读取可能的数据中可能有多个消息,也可能不足一个消息,所以我们选择了用一个ByteBuffer去累计,这样每次读取也会考虑到上一次读取剩下的数据。

同时我们在实现消息读取的时候,不再是定长读取,而是循环检查字节,判断消息是否结尾。每得到一个消息就存入集合中,最后遍历集合依次返回客户端。

上面的代码已经实现了我们的需求,但是却是有缺点的。客户端在写出消息时,如果客户端缓冲区已满,消息无法写出,程序会一直循环等待,很消耗性能。下面我们再来改进一下代码。

代码改进

按照上述改进思路,我们回写消息的时候,再给消息注册写入事件,优化后的代码如下:

public class MainDemo3 {
    static class ChannelBuffer {
        ByteBuffer readBuffer;
        ByteBuffer[] writeBuffers;
        List<ByteBuffer> list = new LinkedList<>();
    }

    //处理拿到可读事件的socket
    static class clientProcessor implements Runnable {
        private Selector selector;

        public clientProcessor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false) {
                            continue;
                        }
                        if (key.isReadable()) {
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ChannelBuffer channelBuffer = (ChannelBuffer) key.attachment();
                            ByteBuffer readBuffer = channelBuffer.readBuffer;
                            int read = clientChannel.read(readBuffer);
                            if (read == -1) {
                                key.cancel();
                                clientChannel.close();
                            } else {
                                readBuffer.flip();
                                int position = readBuffer.position();
                                int limit = readBuffer.limit();
                                List<ByteBuffer> buffers = new ArrayList<>();
                                for (int i = position; i < limit; i++) {
                                    if (readBuffer.get(i) == '\r') ;
                                    {
                                        ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position() + 1);
                                        readBuffer.limit(i + 1);
                                        message.put(readBuffer);
                                        readBuffer.limit(limit);
                                        message.flip();
                                        buffers.add(message);
//                                        byte[] bytes = new byte[message.limit()];
//                                        message.get(bytes);
//                                        System.out.println(bytes);
                                    }
                                }
                                if (channelBuffer.writeBuffers == null) {
                                    ByteBuffer[] byteBuffers = buffers.toArray(new ByteBuffer[buffers.size()]);
                                    clientChannel.write(byteBuffers);
                                    boolean hasRemaining = hasRemaining(byteBuffers);
                                    if (hasRemaining) {
                                        channelBuffer.writeBuffers = byteBuffers;
                                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

                                    }
                                } else {
                                    //还有尚未发送完全的数据,新产生的数据需要放入队列
                                    channelBuffer.list.addAll(buffers);
                                }
                                readBuffer.compact();
                            }
                        }
                        if (key.isWritable()){
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ChannelBuffer channelBuffer = (ChannelBuffer) key.attachment();
                            ByteBuffer[]  writeBuffers  = channelBuffer.writeBuffers;
                            clientChannel.write(writeBuffers);
                            boolean hasRemaining = hasRemaining(writeBuffers);
                            if (hasRemaining==false){
                                channelBuffer.writeBuffers = null;
                                List<ByteBuffer> list = channelBuffer.list;
                                if (!list.isEmpty()){
                                    writeBuffers = list.toArray(new ByteBuffer[list.size()]);
                                    list.clear();
                                    clientChannel.write(writeBuffers);
                                    if (hasRemaining(writeBuffers))
                                    {
                                        //仍然有数据没有完全写出,保留对可写事件的关注
                                    }
                                    else
                                    {
                                        //没有数据要写出了,取消对可写事件的关注
                                        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                    }
                                }
                                else
                                {
                                    //没有数据要写出了,取消对可写事件的关注
                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                }
                            }
                        }
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private boolean hasRemaining(ByteBuffer[] byteBuffers) {
            boolean hasRemaining = false;
            for (ByteBuffer byteBuffer : byteBuffers) {
                if (byteBuffer.hasRemaining()) {
                    hasRemaining = true;
                    break;
                }
            }
            return hasRemaining;
        }
    }

    public static void main(String[] args) throws IOException {
        //新建服务端管道对象并设置为非阻塞
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //基于机器性能创建选择器数组
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        //初始化数组,依次启动线程
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new clientProcessor(selector)).start();
        }
        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                Selector selectorChild = selectors[id.getAndIncrement() % selectors.length];
                ChannelBuffer channelBuffer = new ChannelBuffer();
                socketChannel.register(selectorChild, SelectionKey.OP_READ, channelBuffer);
                selectorChild.wakeup();
                iterator.remove();
            }
        }
    }
}

如上代码,如果在回写消息的时候,如果一次没有写出,不再循环判断等待,而是注册写入事件到选择器。我们用一个list来维护消息回写的有序性,如果上次的数据还没回写完成,则把此次数据添加的list中等待下一次写入。

在判断为写入事件的代码中,我们先对消息进行一次回写,如果回写消息的writeBuffers已经回写完,则开始回写list中的消息,否则结束事件,等待下一次写入。

本文地址:https://blog.csdn.net/sen_sen97/article/details/109587985