互联网技术19——scoket编程中的AIO通信
传统的BIO建立连接需要三次握手,并且在服务器端创建一个线程取处理客户端请求,在NIO中,客户端channel通道注册到多路复用器上的,减少三次握手的过程,在服务器端只需要一个线程去轮询注册到多路复用器上的channel的状态位,根据不同状态执行不同操作。
JDK1.7之后,AIO在之前NIO基础上引入异步通道的概念,并提供了异步文件和异步套接字通道的实现,实现了异步非阻塞。AIO不需要通过多路复用器来对注册的通道进行轮询操作,即可实现异步读写,简化了NIO编程模型。相对于NIO中使用的SocketChannel、ServerScoketChannel,AIO中使用的是AsynchronnousScoketChannel、AsynchronnoousServerSocketCannel。
server.java
package com.SocketAio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by BaiTianShi on 2018/9/2.
*/
public class Server {
//线程池
private ExecutorService executorService;
//通道group
private AsynchronousChannelGroup channelGroup;
//异步服务器通道
AsynchronousServerSocketChannel assc;
public Server(int port) {
try {
//创建一个线程池,注意不要使用fixedThreadPool,这样只能接受有限个数量的并发客户端请求
executorService = Executors.newCachedThreadPool();
//创建异步channelGroup,1代表线程的数量
channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
//创建异步服务器通道
assc = AsynchronousServerSocketChannel.open(channelGroup);
//进行绑定监听端口
assc.bind( new InetSocketAddress(port));
System.out.println("服务端启动,端口为:"+port);
//此处不是阻塞的,而是继续向下执行,进行通信的相关处理操作在ServerCompletionHandler
assc.accept(this, new ServerCompletionHandler());
//一直休眠,不让服务器线程停止
Thread.sleep(Integer.MAX_VALUE);
} catch (IOException e) {
e.printStackTrace();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Server ser = new Server(8765);
}
}
在Server端创建一个缓存线程池,服务端使用的是AsynchronousServerSocketChannel,使用bind()方法绑定监听端口,使用如上述代码Server.java中的accept(this,new ServerCompletionHandler())接收和处理客户端的请求,但是这个accept是一个异步的操作,交给线程池去 异步处理当前这个客户端操作,而Server.java对应的主线程继续向下执行,所以在代码中使用Thread.sleep(Integer.MAX_VALUE);保持server对应的线程非关闭。
ServerCompletionHandler.java
package com.SocketAio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
/**
* Created by BaiTianShi on 2018/9/2.
*/
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,Server >{
@Override
public void completed(AsynchronousSocketChannel asc, Server attachment) {
//当有一个客户端接入的时候,直接调用server的accept方法,
attachment.assc.accept();
read(asc);
}
//AsynchronousSocketChannel为客户端通道
private void read(final AsynchronousSocketChannel asc) {
//读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);
//异步方法,不会阻塞在这,主程序继续执行下面操作
/*This method initiates an asynchronous read operation to read a sequence of bytes
from this channel into the given buffer. */
asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//进行读取之后,重置标识位
attachment.flip();
//获得读取的字节数
System.out.println("Server接收内容字节数:" + resultSize);
//获取读取的数据
String resultData = new String(attachment.array()).trim();
System.out.println("Server接收到的内容:" + resultData);
String response = "收到数据" + resultData;
write(asc, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private void write(AsynchronousSocketChannel asc, String response) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(response.getBytes());
buf.flip();
/*This method initiates an asynchronous write operation to write a sequence of bytes
to this channel from the given buffer. */
//使用到多线程设计模式中的Future,先返回一个Future代理对象。后台新启动一个线程A,进行数据的写操作。调用get()方法时才真正获取线程A执行任务的结果
asc.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
}
client.java
package com.SocketAio;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
/**
* Created by BaiTianShi on 2018/9/2.
*/
public class Client implements Runnable{
private AsynchronousSocketChannel asc ;
public Client() throws Exception {
asc = AsynchronousSocketChannel.open();
}
public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1", 8765));
}
public void write(String content){
try {
asc.write(ByteBuffer.wrap(content.getBytes())).get();//调用get()方法异步写
read();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() {
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
asc.read(buf).get();
buf.flip();
byte[] respByte = new byte[buf.remaining()];
buf.get(respByte);
System.out.println("客户端接收到的反馈信息:"+new String(respByte,"utf-8").trim());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
}
}
public static void main(String[] args) throws Exception {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();
new Thread(c1, "c1").start();
new Thread(c2, "c2").start();
new Thread(c3, "c3").start();
Thread.sleep(1000);
c1.write("this is c1");
c2.write("this is c2");
c3.write("this is c3");
}
}
启动client之前的server端
启动client之后的server端
client端
阻塞与非阻塞、同步与异步
阻塞:应用程序在获取网络数据的时候,如果网络传输数据很慢,那么程序就一直等待,直到数据传输完毕为止
非阻塞:应用程序直接可以获取已准备就绪好的数据,无需等待。
BIO为同步阻塞形式,NIO为同步非阻塞形式。在JDK1.7之后,升级了NIO包,支持异步非阻塞通信模型NI02.0(AIO)
同步:应用程序会直接参与IO读写操作,并直接阻塞到某一个方法上,直到数据准备就绪;或者采用轮询的策略实时监测数据的就绪状态,如果就绪则获取数据。
异步:所有的IO读写操作交给操作系统处理,与应用程序没有直接关系。当操作系统完成了IO读写操作时,会给应用程序发通知,应用程序直接拿走数据即可。
BIO、NIO、AIO三者的区别
BIO:它属于传统的Socket编程,客户端与服务端连接的建立需要TCP的三次握手。服务端ServerSocket首先启动,指定端口并执行accepet进行阻塞,监听客户端的连接请求。若接收到客户端的连接请求并成功建立连接,客户端与服务端通过Socket套接字中的数据流进行相互之间的数据通信。针对每一个成功建立的连接,服务端都会创建一个线程去处理这个客户端请求,若建立的连接的客户端规模很大,对服务器的资源是一种严重浪费。
NIO:在NIO中引入了Channel通道,Buffer缓冲区、Selector多路复用器的概念。客户端SocketChannel与服务端ServerChannel都需要在Selector多路复用器上进行注册。服务器端会创建一个线程对注册到Selector多路复用器上的所有Channel进行轮询,轮序出处于就绪状态的Channel集合,根据为每个Channel分配的唯一key,获取具体的channel,并根据其状态标志位,进行处理,冲Channel中读取或写入数据,写到Buffer数据缓冲区。每个管道都会对selector进行注册不同的事件状态,方便selector查找,事件状态包括:SelectorKey.OP_CPMMECT连接状态、SelectionKey.OP_ACCEPT阻塞状态、SelectionKey.OP_READ可读状态、SelectionKey.OP_WRITE可写状态
AIO:使用线程池中的线程来处理客户端的请求,针对每一个客户端的请求,都会创建一个处理该任务的对象,如上面ServerComptionHandler类的对象,来完成读、写任务。AIO真正实现了异步非阻塞。
下一篇: 互联网技术10——queue队列