欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

一个NIO例子

程序员文章站 2022-04-24 10:50:05
...

例子完成的功能是,客户端发送命令time到服务端,服务端返回当前时间给客户端。

服务端逻辑代码

public class MultiplexerTimeServerHandler implements Runnable {

    private Selector selector = null;
    private ServerSocketChannel servChannel = null;
    private int port;
    private boolean stop;

    /**
     * 初始化多路复用器,绑定监听端口
     *
     * @param port
     */
    public MultiplexerTimeServerHandler(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            //非阻塞
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            //在通道上注册selector,感兴趣事件是OP_ACCEPT
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器监听" + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }


    }

    public void stop() {
        this.stop = true;
    }

    public void run() {
        while (!stop) {
		//循环监听就绪时间
            try {
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (IOException e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
        //多路复用器关闭后,所有注册的channel和pipe等资源都会自动去注册并关闭
        // ,所有不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
	//处理事件
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuff = ByteBuffer.allocate(1024);
                //上面设置了这里是非阻塞的
                int read = sc.read(readBuff);
                if (read > 0) {
                    readBuff.flip();
                    byte[] bytes = new byte[readBuff.remaining()];
                    readBuff.get(bytes);
                    String body = new String(bytes, "utf-8");
                    System.out.println("服务收到命令:" + body);
                    String currentTime = "time".equals(body) ? new Date(System.currentTimeMillis()).toString() : "无效命令";
                    doWrite(sc, currentTime);
                } else if (read < 0) {
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    //读到0字节
                }
            }
        }
    }

    private void doWrite(SocketChannel sc, String content) throws IOException {
        if (content != null && content.trim().length() > 0) {
            byte[] bytes = content.getBytes();
            ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
            byteBuffer.put(bytes);
            byteBuffer.flip();
            //可能存在写半包情况
            sc.write(byteBuffer);
        }
    }
}

服务端启动代码

public class TimeServer {

    public static void main(String... args) throws IOException {
        int port = 8888;

        MultiplexerTimeServerHandler server = new MultiplexerTimeServerHandler(port);
        new Thread(server,"nio-MultiplexerTimeServerHandler-001").start();
    }
}

客户端逻辑代码

public class TimeClientHandler implements Runnable {

    private SocketChannel socketChannel;
    private int port;
    private Selector selector;
    private String host;
    private boolean stop;

    public TimeClientHandler(String host, int port) {

        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;

        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);

        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    public void run() {
        try {
            doConnect();//连接
        } catch (IOException e) {
            e.printStackTrace();
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    handleInput(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    //在通道上注册感兴趣事件为读
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);//发送命令
                } else {
                    //连接失败,退出
                    System.exit(1);
                }
            }
            if (key.isReadable()) {//读取消息
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int read = sc.read(readBuffer);
                if (read > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "utf-8");
                    System.out.println("现在时间为:" + body);
                    this.stop = true;
                } else if (read < 0) {
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    //读到0字节
                }
            }

        }
    }

    private void doConnect() throws IOException {
        //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel socketChannel) throws IOException {
        byte[] bytes = "time".getBytes();
        ByteBuffer writeBuff = ByteBuffer.allocate(bytes.length);
        writeBuff.put(bytes);
        writeBuff.flip();
        socketChannel.write(writeBuff);
        if (!writeBuff.hasRemaining()) {
            System.out.println("客户端发送命令成功");
        }
    }
}

客户端启动代码

public class TimeClient {
    public static void main(String... args) throws IOException {
        int port = 8888;
        new Thread(new TimeClientHandler("127.0.0.1",port),"TimeClientHandler-001").start();
    }
}