欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【NIO详解】基于NIO的client与server

程序员文章站 2022-06-14 10:18:01
...

服务端代码

public class Server_Test {
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                port = 8080;
            }
        }

        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
    }
}

class MultiplexerTimeServer implements Runnable {
    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;

    //初始化多路复用器,绑定监听端口
    public MultiplexerTimeServer(int port) {
        try {
            //打开多路复用器
            selector = Selector.open();
            //打开服务器通道
            servChannel = ServerSocketChannel.open();
            //设置服务器通道为非阻塞模式
            servChannel.configureBlocking(false);
            //绑定端口,backlog指队列的容量,提供了容量限制的功能,避免太多客户端占用太多服务器资源
            //serverSocketChannel有一个队列,存放没有来得及处理的客户端,服务器每次accept,就会从队列中去一个元素。
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            //把服务器通道注册到多路复用器上,并监听阻塞事件
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The server is start in port: " + port);

        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                //多路复用器开始工作(轮询),选择已就绪的通道
                //等待某个通道准备就绪时最多阻塞1秒,若超时则返回。
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (IOException e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭
        //所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            if (key.isAcceptable()) {
                //获取服务器通道
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                //执行阻塞方法(等待客户端资源)
                SocketChannel sc = ssc.accept();
                //设置为非阻塞模式
                sc.configureBlocking(false);
                //注册到多路复用器上,并设置为可读状态
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                //读取数据
                SocketChannel sc = (SocketChannel)key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    //反转缓冲区(复位)
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    //接受缓冲区数据
                    readBuffer.get(bytes);
                    //trim方法返回字符串的副本,忽略前导空白和尾部空白
                    String body = new String(bytes).trim();
//                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : " + body);
//                    String currentTime = "QUERY TIME ORDER"
//                            .equalsIgnoreCase(body) ? new java.util.Date
//                            (System.currentTimeMillis()).toString()
//                            : "BAD ORDER";
                    String currentTime = new Date(System.currentTimeMillis()).toString();
                    //给客户端回写数据
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                }

            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException{
        if (response != null && response.trim().length() > 0) {
            System.out.println(response);
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }


}

程序看烦了吧!来看看这张图帮助你理解程序

服务端通信序列图:

【NIO详解】基于NIO的client与server

其实,主要步骤可以分为

  • Selector.open():打开一个Selector。
  • ServerSocketChannel.open():创建服务端的Channel。
  • bind():绑定到某个端口上。并配置非阻塞模式。
  • register():注册Channel和关注的事件到Selector上。
  • select():拿到已经就绪的事件。


客户端代码

public class Client_Test {
    public static void main(String[] args) throws UnknownHostException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                port = 8080;
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1", port), "Time-Client-001").start();
    }
}
class TimeClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    //默认boolean值为false
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        //host若为空,则设置为指定ip
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            //打开多路复用器
            selector = Selector.open();
            //打开管道
            socketChannel = SocketChannel.open();
            //设置管道为非阻塞模式
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                //阻塞等待1s,若超时则返回
                selector.select(1000);
                //获取所有selectionkey
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                //遍历所有selectionkey
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    //获取之后删除
                    it.remove();
                    try {
                        //处理该selectionkey
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            //取消selectionkey
                            key.cancel();
                            if (key.channel() != null) {
                                //关闭该通道
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        if (selector != null) {
            try {
                //关闭多路复用器
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void handleInput(SelectionKey key) throws IOException{
        //若该selectorkey可用
        if (key.isValid()) {
            //将key转型为SocketChannel
            SocketChannel sc = (SocketChannel) key.channel();
            //判断是否连接成功
            if (key.isConnectable()) {
                //若已经建立连接
                if (sc.finishConnect()) {
                    //向多路复用器注册可读事件
                    sc.register(selector, SelectionKey.OP_READ);
                    //向管道写数据
                    doWrite(sc);
                }else {
                    //连接失败 进程退出
                    System.exit(1);
                }
            }

            //若是可读的事件
            if (key.isReadable()) {
                //创建一个缓冲区
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                System.out.println("before  :  "+readBuffer);
                //从管道中读取数据然后写入缓冲区中
                int readBytes = sc.read(readBuffer);
                System.out.println("after :  "+readBuffer);
                //若有数据
                if (readBytes > 0) {
                    //反转缓冲区
                    readBuffer.flip();
                    System.out.println(readBuffer);

                    byte[] bytes = new byte[readBuffer.remaining()];
                    //获取缓冲区并写入字节数组中
                    readBuffer.get(bytes);
                    //将字节数组转换为String类型
                    String body = new String(bytes);
                    System.out.println(body.length());
                    System.out.println("Now is : " + body + "!");
                    this.stop = true;
                } else if (readBytes < 0) {
                    key.cancel();
                    sc.close();
                } else {
                    sc.register(selector, SelectionKey.OP_READ);
                }
            }
        }
    }
    public void doConnect() throws IOException {
        //通过ip和端口号连接到服务器
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            //向多路复用器注册可读事件
            socketChannel.register(selector, SelectionKey.OP_READ);
            //向管道写数据
            doWrite(socketChannel);
        } else {
            //若连接服务器失败,则向多路复用器注册连接事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }
    private void doWrite(SocketChannel sc) throws IOException {
        //要写的内容
        byte[] req = "    -    QUERY TIME ORDER     -   ".getBytes();
        //为字节缓冲区分配指定字节大小的容量
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        //将内容写入缓冲区
        writeBuffer.put(req);
        //反转缓冲区
        writeBuffer.flip();
        //输出打印缓冲区的可读大小
        System.out.println(writeBuffer.remaining());
        //将内容写入管道中
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            //若缓冲区中无可读字节,则说明成功发送给服务器消息
            System.out.println("Send order 2 server succeed.");
        }
    }

}

程序看烦了吧!来看看这张图帮助你理解程序

客户端通信序列图:

【NIO详解】基于NIO的client与server



服务端运行结果:

【NIO详解】基于NIO的client与server



客户端运行结果

【NIO详解】基于NIO的client与server

小结:

首先恭喜你看完了“这么麻烦”的CS代码,和BIO相比,NIO的代码确实复杂了很多,但是不是就意味着我们必须要编写这么复杂的业务代码呢?答案是否定的,我感觉直接使用NIO编程,容易出错,尤其是要读写转换时要反转缓冲区,那么一个好的解决方法油然而生,那就是Netty,一个基于事件驱动的网络框架。

相关标签: nio server