欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

互联网技术18——socket编程之NIO

程序员文章站 2022-03-18 19:45:11
...

NIO同步非阻塞

NIO是同步非阻塞的,在传统的TCP点对点直接连接的基础上做了一层封装,并不是Client与Server直接建立连接,而是Client先到Server端进行管道注册。在Server端创建一个Selector多路复用器,启动一个线程轮询注册到Selector上的所有Channerl的状态,根据通道的状态,执行相关操作。通道的状态包括: Connect连接状态、Accept阻塞状态、Read可读状态、Write可写状态。

NIO编程3个重要部分:Buffer缓冲区、Channel管道、Selecter多路复用器。

 

Buffer缓冲区:本质上是可以存储数据的内存,被封装成了Buffer对象而已。

       (   本部分借鉴文章地址:https://www.cnblogs.com/tengpan-cn/p/5809273.html  )

缓冲区类型:

  1. ByteBuffer 
  2. MappedByteBuffer
  3. CharBuffer
  4. DoubleBuffer
  5. FloatBuffer
  6. IntBuffer
  7. LongBuffer
  8. ShortBuffer

常用方法

  1. allocate() 分配一块缓冲区
  2. put() 向缓冲区写数据
  3. get() 从缓冲区读数据,  其中get()方法会将position加1,get(index)不会改变position的大小,
  4. filp() 将缓冲区从写模式切换到读模式
  5. clear() 将缓冲区从读模式切换到写模式,不会清空数据,但是后续写数据会覆盖元数据,即使有数据没被读,也会被遗忘
  6. compat() 从读数据切换到写模式,数据不会清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据,
  7. mark() 对position做出标记,配合reset使用
  8. reset() 将position置为标记值
  9. duplicate() 复制一分buffer缓冲区

缓冲区的属性 

  1. capacity 缓冲区的大小,无论是读写哪种模式,属性不会变
  2.  position 写数据时:position表示当前写的位置,每写一个数据,会向下移动一个数据单元。初始值为0,最大值为capacity-1,读数据时:position会被置0,表示当前读的位置
  3. limit  写模式下,limit相当capacity,表示最多能写多少数据,切换到读模式时,limit等于原先写操作时的position,表示最多可以读多少数据

 

BufferTest.java

package com.socketNio;

import com.DisruptorLiuBianXing.Main;

import java.nio.IntBuffer;

/**
 * Created by BaiTianShi on 2018/8/30.
 */
public class bufferTest {
    public static void main(String[] args) {
        //allocate  分配一块空间
        IntBuffer intBuffer = IntBuffer.allocate(10);
        intBuffer.put(1);
        intBuffer.put(2);
        intBuffer.put(3);
        intBuffer.put(4);
        intBuffer.put(5);
        //写模式下,position记录的是当前写入的位置
        System.out.println("intBuffer"+intBuffer);
        //切换到读模式,将position置0
        intBuffer.flip();
        System.out.println("intBuffer执行flip后"+intBuffer);
        System.out.println("pos"+intBuffer.position());
        System.out.println("lim"+intBuffer.limit());
        System.out.println("cap"+intBuffer.capacity());

        //执行get(index)不会改变position 的大小
        System.out.println("执行intBuffer.get(3)"+intBuffer.get(3));
        System.out.println("执行intBuffer.get(3)后"+intBuffer);

        //执行put(index,value)不会改变position 的大小
        intBuffer.put(2,33);
        System.out.println("intBuffer执行put(2,33)后"+intBuffer);

        //执行get()方法,无索引参数,会改变position大小
        for (int i = 0; i < intBuffer.limit(); i++) {
            System.out.print(intBuffer.get()+"\t");
        }
        System.out.println();

        System.out.println("intBuffer使用循环遍历之后"+intBuffer);
        System.out.println("pos"+intBuffer.position());
        System.out.println("lim"+intBuffer.limit());
        System.out.println("cap"+intBuffer.capacity());

        System.out.println("--------warp包裹数组---------");

        //通过warp包裹数组声明的缓冲区,改变缓冲区中的内容会同时改变原数组中的内容
        int[] array = new int[]{6,7,8,9,10};
        IntBuffer warpBuffer = IntBuffer.wrap(array);

        System.out.println("warpBuffer"+warpBuffer);
        for (int i = 0; i < warpBuffer.limit(); i++) {
            System.out.print(warpBuffer.get()+"\t");
        }
        System.out.println();
        System.out.println("wrapIntBuffer使用for循环遍历之后: " + warpBuffer);

        warpBuffer.flip();
        warpBuffer.put(2,88);
        System.out.println("pos"+warpBuffer.position());
        System.out.println("lim"+warpBuffer.limit());
        System.out.println("cap"+warpBuffer.capacity());

        System.out.println("使用for循环遍历");
        for (int i = 0; i < warpBuffer.limit(); i++) {
            System.out.print(warpBuffer.get()+"\t");
        }
        System.out.println();
        System.out.println("被warp包裹的array内容发生了改变");
        for (int i = 0; i < array.length ; i++) {
            System.out.print(array[i]+"\t");
        }

        //复制方法
        System.out.println();
        System.out.println("------复制方法------");
        IntBuffer intBufferOne = IntBuffer.allocate(10);
        intBufferOne.put(array);
        System.out.println("intBufferOne:"+intBufferOne);
        intBufferOne.flip();
        System.out.println("使用for循环遍历");
        for (int i = 0; i < intBufferOne.limit(); i++) {
            System.out.print(intBufferOne.get()+"\t");
        }

        IntBuffer intBufferTwo = intBufferOne.duplicate();
        System.out.println();
        System.out.println("intBufferTwo:"+intBufferTwo);
        System.out.println("可读数据为:" + intBufferTwo.remaining());//limit - position
        intBufferTwo.position(2);
        System.out.println("intBufferTwo:"+intBufferTwo);
        System.out.println("可读数据为:" + intBufferTwo.remaining());//limit - position


    }
}

      

Channel通道:

1. 网络数据通过Channel通道读取和写入,通与流的不同之处在于通道是双向的,而流只能在一个方向上移动,用到可以读、写或同时进行。channerl通道可以与多路复用器结合起来,有多重状态位,方便多路福通气识别并执行操作。

2. channel分为两大类,一类是网络读写的selectableChannel,一类是用于文件操作的FileChannel。SocketChannel和SocketServerChannel都是SelectableChannerl的子类。

 

selector多路复用器:

它是NIO编程的基础,提供选择已经就绪任务的能力,当io事件(管道)注册到选择器后,seletor会分配给每个管道一个key值。seletor会不断的轮询注册在其上的通道channel,如果某个通道发生了读写操作,这个管道就处于就绪状态,会被selevtor轮询出来,然后通过selectionKey可以取得就绪的channel集合,从而进行io操作。宇哥selector可以辅助成千上万的channel通道,jdk使用epoll代替了传统的select实现,使得获取连接句柄没有限制,只需要一个线程负责selector轮询,就可以接入成千上万的客户端。

下面代码中,在server类的构造方法中,创建ServerSocketChannel对象,将该对象注册到多路复用器Sekector上,并处于阻塞ACCEPT状态。由于server实现了runnable接口,在run方法中while(true)循环,在while循环体中,不论客户端channel还是服务器channel,都在selector轮询范围内。在轮询过程中,获取所有注册到多路复用器selector上的key,在这个while首次执行的时候,获取到的处于阻塞状态的channel一定是我们启动服务端是注册的服务端管道ScoketServerChannel,这个服务端channel执行accept方法,监听处于就绪状态的客户端channel,将客户端channel通道注册到多路复用器select上,并监听器读标识位。在存在的客户端channel注册到selector的情况下,在while循环体重,如客户端key处于key.isReadable()为true是,就会执行read方法,结合byteBUffer的一系列方法,将channel中的数据读取到缓冲区中。

每一次从客户端发送数据,到服务端获取到数据,需要两个发现的过程,第一次:发现是服务端的accept方法监听服务端连接并准备就绪事件,发现后将此客户端在多路复用器selector上的状态标识位变为可读,即:发现并标识可读,第二次:当while()进行下一次循环的时候,再拿到这个channel时已经被标识为可读的了,这样因为其实可读状态,就可以直接读取了,即:读可读的channel;服务端可通信端原理相同,

 

综合使用Buffer、Channel、Selector的Client端与Server端双向通信示例

Server.java

package com.socketNio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * Created by BaiTianShi on 2018/8/31.
 *
 */
public class Server implements Runnable{
    //多路复用器,管理所有的管道
    private Selector selector;

    //建立读缓冲区,大小为1024
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);

    //建立写缓存区
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);

    public Server(int port) {
        try {
            //打开多路复用器
            this.selector = Selector.open();
            //打开服务通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //设置服务器为非阻塞模式
            ssc.configureBlocking(false);

            ssc.bind(new InetSocketAddress(port));
            //把服务器通道注册到多路复用器上,并且监听阻塞事件
            ssc.register(selector,SelectionKey.OP_ACCEPT);
            System.out.println("server start port:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true){
            try {
                //多路复用器开始监听
                this.selector.select();
                //返回多路复用器已经选择的结果,
                System.out.println("轮序已经就绪的管道");
                //t通过注释掉137行,跟踪断点可证明,当多路复用器selector中有channel状态改变时,才会被这个selectedKeys方法获取到
                Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
                System.out.println("发现就绪的channel管道");
                while (keys.hasNext()){
                    //获取一个选择元素
                    SelectionKey key = keys.next();
                    //直接从容器中移除
                    keys.remove();
                    //如果是有效的
                    if(key.isValid()){
                        //如果是阻塞的状态,一定是最开始注册的服务端通道
                        if(key.isAcceptable()){
                            this.accept(key);
                        }
                        //如果是可读状态
                        if(key.isReadable()){
                            this.read(key);

                        }
                        //如果是可写状态
                        if(key.isWritable()){
                            this.write(key);

                        }
                    }

                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    //向socketChannel中写数据
    private void write(SelectionKey key){
        SocketChannel sc = (SocketChannel) key.channel();
        //定义一个字节数据
        byte[] bytes = new byte[1024];
        try {
            //使用系统录入功能,等待用户输入数据并回车
            System.in.read(bytes);
            //把系统录入的数据防盗缓冲区
            writeBuf.put(bytes);
            //对缓冲区进行复位
            writeBuf.flip();
            //将缓冲区的数据写给client端
            sc.write(writeBuf);
            //清空缓冲区
            writeBuf.clear();
            //因以执行了想socketChannel的写操作,这里想select注册sc通道的读事件状态,
            sc.register(selector,SelectionKey.OP_READ);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //读socketChannel中的数据
    private void read(SelectionKey key){
        try {
            //清空缓冲区
            this.readBuf.clear();
            //通过key获取之前注册的通道
            SocketChannel sc = (SocketChannel) key.channel();
            //读取数据到缓冲区
            int count = sc.read(this.readBuf);

            //如果没有数据
            if(count == -1){
                //关闭
                key.channel().close();
                //取消此key绑定的socketChannel的注册
                key.cancel();
                return;
            }
            //有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
			/*Flips this buffer. The limit is set to the current position and then
			the position is set to zero. If the mark is defined then it is discarded.*/
            this.readBuf.flip();
            //根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用数据个数
            //接收缓冲区数据到字节数组
            this.readBuf.get(bytes);
            //打印结果
            String body = new String(bytes).trim();
            System.out.println("服务器端接收到客户端发送的信息 : " + body);
            //因已经执行了向SocketChannel的读操作,这里向selector注册sc通道的写事件状态
            sc.register(this.selector,SelectionKey.OP_WRITE);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //如果是阻塞的状态,一定是最开始注册的服务端通道
    private void accept(SelectionKey key){
        try {
            //最开始注册的服务端通道
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //这是方法,直到客户端有通道注册,才能返回客户端的通道
            SocketChannel sc = ssc.accept();
            //将客户端设置为阻塞模式
            sc.configureBlocking(false);
            //将客户端通道注册搭配多路复用器上,并设置为读取标识
            sc.register(this.selector,SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    public static void main(String[] args) {
        new  Thread( new Server(8765)).start();
    }

}

在Server.java中,因为仅仅ServerSocketChannel对象在Selector上注册了SelectionKey.OP_ACCEPT事件状态,因此Server端创建的一个线程,在轮询Selector过程中,获取处于就绪状态的所有Channel通道的集合。Selector分配给ServerSocketChannel对象的唯一key,这个key.isAcceptable()为true则执行accept(key)方法,使这个key对应的服务器端Channel一直处于accept监听状态。

 

Client.java

package com.socketNio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * Created by BaiTianShi on 2018/9/1.
 */
public class Client implements Runnable {
    //多路复用器,管理所有通道
    private Selector selector;

    //写缓冲区
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    //读缓冲区
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);

    //建立连接地址
    InetSocketAddress address = new InetSocketAddress("127.0.0.1",8765);

    public Client() {

        try {
            //打开多路复用器
            this.selector = Selector.open();

            //打开客户端通道
            SocketChannel sc = SocketChannel.open();
            //客户端通道为非阻塞模式
            sc.configureBlocking(false);

            //注册到多路复用器selector上,给sc注册connect事件状态
            sc.register(selector, SelectionKey.OP_CONNECT);

            //进行连接
            sc.connect(address);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        SocketChannel socketChannel;
        while (true){
            try {
                //要让多路复用器开始监听
                this.selector.select();

                //返回多路复用器已经选择的结果
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                //遍历
                while (keys.hasNext()){
                    //获取一个选择的元素
                    SelectionKey key = keys.next();
                    //直接容器中移除
                    keys.remove();

                    //如果是有效的
                    if(key.isValid()){
                        //如果为连接状态,
                        if(key.isConnectable()){
                            System.out.println("client connect");
                            socketChannel =(SocketChannel)key.channel();
							/*Returns:
								true if, and only if, a connection operation has been initiated on
								this channel but not yet completed by invoking the finishConnect method*/
                            if(socketChannel.isConnectionPending()){
                                socketChannel.finishConnect();
                                System.out.println("客户端完成连接操作!");
                                //把数据放到缓冲区中
                                writeBuf.put("Hello,Server".getBytes());
                                //对缓冲区进行复位
                                writeBuf.flip();
                                //写出数据给Server端
                                socketChannel.write(writeBuf);
                                //清空写缓冲区
                                writeBuf.clear();

                            }
                            socketChannel.register(selector, SelectionKey.OP_READ);

                        }
                        //如果为可读状态
                        if(key.isReadable()){
                            this.read(key);
                        }
                        //如果为可写状态
                        if(key.isWritable()){
                            this.write(key);
                        }
                    }

                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    private void write(SelectionKey key){
        try {
            SocketChannel sc =  (SocketChannel) key.channel();
            byte[] bytes = new byte[1024];
            System.in.read(bytes);
            //把数据放到缓冲区中
            writeBuf.put(bytes);
            //对缓冲区进行复位
            writeBuf.flip();
            //写出数据给Server端
            sc.write(writeBuf);
            //清空缓冲区数据
            writeBuf.clear();
            sc.register(this.selector, SelectionKey.OP_READ);
        } catch (ClosedChannelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void read(SelectionKey key) {
        try {
            //清空缓冲区旧的数据
            this.readBuf.clear();
            //获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //读取数据到缓冲区
            int count = sc.read(this.readBuf);
            //如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            this.readBuf.flip();
            //根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];//this.readBuf.remaining()可用数据个数
            //接收缓冲区数据到字节数组
            this.readBuf.get(bytes);
            // 打印结果
            String body = new String(bytes).trim();
            System.out.println("客户端接收到服务器端返回的信息 : " + body);
            sc.register(this.selector, SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        try {
            Client client=new Client();
            new Thread(client).start();//单独启动一个线程,去轮询注册到多路复用器上的所有通道
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

启动server端

互联网技术18——socket编程之NIO

启动client端后server端控制台,第一遍轮序为发现通过accept的服务端发现就绪的客户端,并置为可读状态。第二遍轮询为重新拿到刚刚置为可读的客户端读取数据,并置为可写。第三遍循环为拿到刚刚可写的客户端,然后准备写数据(待键盘输入)

互联网技术18——socket编程之NIO

客户端控制台:当服务端没有回复数据的时候,一直阻塞在 this.selector.select();

互联网技术18——socket编程之NIO

服务端手动输入数据,绿色的输入内容我写错了,应该是服务端,

互联网技术18——socket编程之NIO

客户端手动输入数据

互联网技术18——socket编程之NIO

 

 

 

 

 

 

 

 

 

 

 

 

 

相关标签: socket