java 并发
程序员文章站
2022-05-04 11:07:24
...
编号 | 名词 | 解释 | 举例 |
1 | 同步 | 指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪 | 自己上街买衣服,自己亲自干这件事,别的事干不了。 |
2 | 异步 | 异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知(异步的特点就是通知) | 告诉朋友自己合适衣服的尺寸,大小,颜色,让朋友委托去卖,然后自己可以去干别的事。(使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS) |
3 | 阻塞 | 所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时, 如果当时没有东西可读,或者暂时不可写, 程序就进入等待 状态, 直到有东西可读或者可写为止 | 去公交站充值,发现这个时候,充值员不在(可能上厕所去了),然后我们就在这里等待,一直等到充值员回来为止。(当然现实社会,可不是这样,但是在计算机里确实如此。) |
4 | 非阻塞 | 非阻塞状态下, 如果没有东西可读, 或者不可写, 读写函数马上返回, 而不会等待, | 银行里取款办业务时,领取一张小票,领取完后我们自己可以玩玩手机,或者与别人聊聊天,当轮我们时,银行的喇叭会通知,这时候我们就可以去了。 |
同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
同步非阻塞IO(Java NIO) : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。用户进程也需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问。
异步阻塞IO(Java NIO):
此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄(如果从UNP的角度看,select属于同步操作。因为select之后,进程还需要读写数据),从而提高系统的并发性!
(Java AIO(NIO.2))异步非阻塞IO:
在此种模式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class EchoAioServer {
private final int port;
public static void main(String args[]) {
int port = 7354;
new EchoAioServer(port);
}
public EchoAioServer(int port) {
this.port = port;
listen();
}
private void listen() {
try {
ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 2);
try (AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup)) {
server.bind(new InetSocketAddress(port), 100);
System.out.println("Echo listen on " + port);
System.out.println("isOpen()= " + server.isOpen());
if (!server.isOpen()) {
return ;
}
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer echoBuffer = ByteBuffer.allocateDirect(1024);
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("waiting ....");
try {
echoBuffer.clear();
result.read(echoBuffer).get();
echoBuffer.flip();
// echo data
result.write(echoBuffer);
echoBuffer.flip();
// System.out.println("Echoed '" + new String(echoBuffer.array()) + "' to " + result);
} catch (InterruptedException | ExecutionException e) {
System.out.println(e.toString());
} finally {
try {
result.close();
server.accept(null, this);
} catch (Exception e) {
System.out.println(e.toString());
}
}
System.out.println("done...");
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("server failed: " + exc);
}
});
try {
// Wait for ever
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
} catch (IOException e) {
System.out.println(e);
}
}
}
//客户端代码
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class EchoAioClient {
private final AsynchronousSocketChannel client ;
public EchoAioClient() throws Exception{
client = AsynchronousSocketChannel.open();
}
public void start()throws Exception{
client.connect(new InetSocketAddress("127.0.0.1",8000),null,new CompletionHandler<Void,Void>() {
@Override
public void completed(Void result, Void attachment) {
try {
client.write(ByteBuffer.wrap("this is a test".getBytes())).get();
System.out.println("send data to server");
} catch (Exception ex) {
ex.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
final ByteBuffer bb = ByteBuffer.allocate(1024);
client.read(bb, null, new CompletionHandler<Integer,Object>(){
@Override
public void completed(Integer result, Object attachment) {
System.out.println(result);
System.out.println(new String(bb.array()));
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
);
try {
// Wait for ever
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
public static void main(String args[])throws Exception{
new EchoAioClient().start();
}
}