欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

互联网技术19——scoket编程中的AIO通信

程序员文章站 2022-07-11 11:53:26
...

传统的BIO建立连接需要三次握手,并且在服务器端创建一个线程取处理客户端请求,在NIO中,客户端channel通道注册到多路复用器上的,减少三次握手的过程,在服务器端只需要一个线程去轮询注册到多路复用器上的channel的状态位,根据不同状态执行不同操作。

 

JDK1.7之后,AIO在之前NIO基础上引入异步通道的概念,并提供了异步文件和异步套接字通道的实现,实现了异步非阻塞。AIO不需要通过多路复用器来对注册的通道进行轮询操作,即可实现异步读写,简化了NIO编程模型。相对于NIO中使用的SocketChannel、ServerScoketChannel,AIO中使用的是AsynchronnousScoketChannel、AsynchronnoousServerSocketCannel。

server.java

package com.SocketAio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by BaiTianShi on 2018/9/2.
 */
public class Server  {
    //线程池
    private ExecutorService executorService;

    //通道group
    private AsynchronousChannelGroup channelGroup;

    //异步服务器通道
    AsynchronousServerSocketChannel assc;

    public Server(int port) {
        try {

            //创建一个线程池,注意不要使用fixedThreadPool,这样只能接受有限个数量的并发客户端请求
            executorService = Executors.newCachedThreadPool();
            //创建异步channelGroup,1代表线程的数量
            channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);

            //创建异步服务器通道
            assc = AsynchronousServerSocketChannel.open(channelGroup);

            //进行绑定监听端口
            assc.bind( new InetSocketAddress(port));
            System.out.println("服务端启动,端口为:"+port);

            //此处不是阻塞的,而是继续向下执行,进行通信的相关处理操作在ServerCompletionHandler
            assc.accept(this, new ServerCompletionHandler());
            //一直休眠,不让服务器线程停止
            Thread.sleep(Integer.MAX_VALUE);

        } catch (IOException e) {
            e.printStackTrace();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Server ser = new Server(8765);
    }
}

在Server端创建一个缓存线程池,服务端使用的是AsynchronousServerSocketChannel,使用bind()方法绑定监听端口,使用如上述代码Server.java中的accept(this,new ServerCompletionHandler())接收和处理客户端的请求,但是这个accept是一个异步的操作,交给线程池去 异步处理当前这个客户端操作,而Server.java对应的主线程继续向下执行,所以在代码中使用Thread.sleep(Integer.MAX_VALUE);保持server对应的线程非关闭。

ServerCompletionHandler.java

package com.SocketAio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

/**
 * Created by BaiTianShi on 2018/9/2.
 */
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,Server >{
    @Override
    public void completed(AsynchronousSocketChannel asc, Server attachment) {
        //当有一个客户端接入的时候,直接调用server的accept方法,
        attachment.assc.accept();
        read(asc);
    }

    //AsynchronousSocketChannel为客户端通道
    private void read(final AsynchronousSocketChannel asc) {
        //读取数据
        ByteBuffer buf = ByteBuffer.allocate(1024);
        //异步方法,不会阻塞在这,主程序继续执行下面操作
		/*This method initiates an asynchronous read operation to read a sequence of bytes
		from this channel into the given buffer. */
        asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer resultSize, ByteBuffer attachment) {
                //进行读取之后,重置标识位
                attachment.flip();
                //获得读取的字节数
                System.out.println("Server接收内容字节数:" + resultSize);
                //获取读取的数据
                String resultData = new String(attachment.array()).trim();
                System.out.println("Server接收到的内容:" + resultData);
                String response = "收到数据" + resultData;
                write(asc, response);
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }

    private void write(AsynchronousSocketChannel asc, String response) {
        try {
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.put(response.getBytes());
            buf.flip();
			/*This method initiates an asynchronous write operation to write a sequence of bytes
			to this channel from the given buffer. */
            //使用到多线程设计模式中的Future,先返回一个Future代理对象。后台新启动一个线程A,进行数据的写操作。调用get()方法时才真正获取线程A执行任务的结果
            asc.write(buf).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void failed(Throwable exc, Server attachment) {
        exc.printStackTrace();
    }

}

client.java

package com.SocketAio;

import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;

/**
 * Created by BaiTianShi on 2018/9/2.
 */
public class Client implements Runnable{

    private AsynchronousSocketChannel asc ;

    public Client() throws Exception {
        asc = AsynchronousSocketChannel.open();
    }

    public void connect(){
        asc.connect(new InetSocketAddress("127.0.0.1", 8765));
    }

    public void write(String content){
        try {
            asc.write(ByteBuffer.wrap(content.getBytes())).get();//调用get()方法异步写
            read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void read() {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            asc.read(buf).get();
            buf.flip();
            byte[] respByte = new byte[buf.remaining()];
            buf.get(respByte);
            System.out.println("客户端接收到的反馈信息:"+new String(respByte,"utf-8").trim());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){

        }
    }

    public static void main(String[] args) throws Exception {
        Client c1 = new Client();
        c1.connect();
        Client c2 = new Client();
        c2.connect();
        Client c3 = new Client();
        c3.connect();

        new Thread(c1, "c1").start();
        new Thread(c2, "c2").start();
        new Thread(c3, "c3").start();

        Thread.sleep(1000);

        c1.write("this is c1");
        c2.write("this is c2");
        c3.write("this is c3");
    }

}

 

启动client之前的server端

互联网技术19——scoket编程中的AIO通信

启动client之后的server端

互联网技术19——scoket编程中的AIO通信

client端

互联网技术19——scoket编程中的AIO通信

阻塞与非阻塞、同步与异步

阻塞:应用程序在获取网络数据的时候,如果网络传输数据很慢,那么程序就一直等待,直到数据传输完毕为止

非阻塞:应用程序直接可以获取已准备就绪好的数据,无需等待。

BIO为同步阻塞形式,NIO为同步非阻塞形式。在JDK1.7之后,升级了NIO包,支持异步非阻塞通信模型NI02.0(AIO)

同步:应用程序会直接参与IO读写操作,并直接阻塞到某一个方法上,直到数据准备就绪;或者采用轮询的策略实时监测数据的就绪状态,如果就绪则获取数据。

异步:所有的IO读写操作交给操作系统处理,与应用程序没有直接关系。当操作系统完成了IO读写操作时,会给应用程序发通知,应用程序直接拿走数据即可。

 

BIO、NIO、AIO三者的区别

BIO:它属于传统的Socket编程,客户端与服务端连接的建立需要TCP的三次握手。服务端ServerSocket首先启动,指定端口并执行accepet进行阻塞,监听客户端的连接请求。若接收到客户端的连接请求并成功建立连接,客户端与服务端通过Socket套接字中的数据流进行相互之间的数据通信。针对每一个成功建立的连接,服务端都会创建一个线程去处理这个客户端请求,若建立的连接的客户端规模很大,对服务器的资源是一种严重浪费。

NIO:在NIO中引入了Channel通道,Buffer缓冲区、Selector多路复用器的概念。客户端SocketChannel与服务端ServerChannel都需要在Selector多路复用器上进行注册。服务器端会创建一个线程对注册到Selector多路复用器上的所有Channel进行轮询,轮序出处于就绪状态的Channel集合,根据为每个Channel分配的唯一key,获取具体的channel,并根据其状态标志位,进行处理,冲Channel中读取或写入数据,写到Buffer数据缓冲区。每个管道都会对selector进行注册不同的事件状态,方便selector查找,事件状态包括:SelectorKey.OP_CPMMECT连接状态、SelectionKey.OP_ACCEPT阻塞状态、SelectionKey.OP_READ可读状态、SelectionKey.OP_WRITE可写状态

AIO:使用线程池中的线程来处理客户端的请求,针对每一个客户端的请求,都会创建一个处理该任务的对象,如上面ServerComptionHandler类的对象,来完成读、写任务。AIO真正实现了异步非阻塞。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关标签: Scoket