Java通讯模型-BIO、NIO、AIO综合演练
一.前言
1.常规技术:Spring系统、ORM组件、服务支持;
数据表的CRUD处理(重复且大量的编写),这种开发好像不是开发的感觉。
2.未来的开发人才到底该具备哪些技能—架构师
a.可以完成项目,同时可以很好的沟通;
b.掌握各种常规的开发技术,并且掌握一些服务组件的使用(需要有好的运维);
c.良好的代码设计能力 —— 代码重用与标准设定;
d.非常清楚底层通讯机制,并且可以根据实际的业务需求,进行底层通讯协议的定义;
3.网络通讯的核心思想:请求-回应
网络七层模型: 应用层、表示层、会话层、传输层(数据段)、网络层(数据包)、数据链路层(数据帧)、物理层(比特流)。
TCP协议是整个现代项目开发中的基础,包括HTTP协议也都是在TCP基础上实现的。
二.案例说明
本次的案例:采用一个标准的ECHO程序
客户端输入一个内容,随后服务器端接收到之后进行数据的返回,在数据前面追加有"【ECHO】"的信息。
"telnet 主机名称 端口号",主要是进行TCP协议的通讯,而对于服务器端是如何实现的并不关注。
项目代码结构:
基础公共服务类:
InputUtil.java
package com.bijian.study.util; import java.io.BufferedReader; import java.io.InputStreamReader; /** * 实现键盘输入数据的处理操作 */ public class InputUtil { // 对于键盘输入数据的操作而言,很明显使用BufferedReader要比使用Scanner更加方便 private static final BufferedReader KEYBOARD_INPUT = new BufferedReader(new InputStreamReader(System.in)); private InputUtil() { } // 内部将直接提供有static方法 /** * 通过键盘输入数据并返回有字符串的内容 * @param prompt 提示信息 * @return 一个输入的字符串,并且该字符串的内容不为空 */ public static String getString(String prompt) { String returnData = null; // 进行接收数据的返回 boolean flag = true; // 进行数据验证的基础逻辑判断 while (flag) { System.out.print(prompt); try { returnData = KEYBOARD_INPUT.readLine(); // 通过键盘读取数据 if (returnData == null || "".equals(returnData)) { System.err.println("输入的数据不允许为空!"); } else { flag = false; // 结束循环 } } catch (Exception e) { System.err.println("输入的数据错误!"); } } return returnData; } }
ServerInfo.java
package com.bijian.study.commons; public interface ServerInfo { public static final int PORT = 6789; // 定义服务器端的访问端口 public static final String ECHO_SERVER_HOST = "localhost"; // 要访问的远程主机名称 }
三.BIO
【JDK 1.0】实现BIO程序开发:同步阻塞IO操作,每一个线程都只会管理一个客户端的链接,这种操作的本质是存在有程序阻塞的问题。
线程的资源总是在不断的进行创建,并且所有的数据接收里面(Scanner、PrintStream简化了),网络的数据都是长度限制的,传统的数据是需要通过字节数组的形式搬砖完成的。
BIO:是需要对数据采用蚂蚁搬家的模式完成的。
程序问题:性能不高、多线程的利用率不高、如果大规模的用户访问,有可能会造成服务器端资源耗尽。
EchoServer.java
package com.bijian.study.bio.server; import java.io.PrintStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; import com.bijian.study.commons.ServerInfo; class EchoServerHandle implements AutoCloseable { private ServerSocket serverSocket; public EchoServerHandle() throws Exception { this.serverSocket = new ServerSocket(ServerInfo.PORT); // 进行服务端的Socket启动 System.out.println("ECHO服务器端已经启动了,该服务在" + ServerInfo.PORT + "端口上监听...."); this.clientConnect(); } private void clientConnect() throws Exception { boolean serverFlag = true; while (serverFlag) { Socket client = this.serverSocket.accept(); // 等待客户端连接 Thread clientThread = new Thread(() -> { try { Scanner scan = new Scanner(client.getInputStream());// 服务器端输入为客户端输出 PrintStream out = new PrintStream(client.getOutputStream());// 服务器端的输出为客户端输入 scan.useDelimiter("\n"); // 设置分隔符 boolean clientFlag = true; while (clientFlag) { if (scan.hasNext()) { // 现在有内容 String inputData = scan.next(); // 获得输入数据 if(inputData.startsWith("exit")) { // 信息结束 //if ("exit".equalsIgnoreCase(inputData)) { clientFlag = false; // 结束内部的循环 out.println("[ECHO]Bye Bye ... kiss"); // 一定需要提供有一个换行机制,否则Scanner不好读取 } else { out.println("[ECHO]" + inputData); // 回应信息 } } } client.close(); } catch (Exception e) { e.printStackTrace(); } }); clientThread.start(); // 启动多线程 } } @Override public void close() throws Exception { this.serverSocket.close(); } } /** * 实现服务器端的编写开发,采用BIO(阻塞模式)实现开发的基础结构 */ public class EchoServer { public static void main(String[] args) throws Exception { new EchoServerHandle(); } }
EchoClient.java
package com.bijian.study.bio.client; import java.io.PrintStream; import java.net.Socket; import java.util.Scanner; import com.bijian.study.commons.ServerInfo; import com.bijian.study.util.InputUtil; class EchoClientHandle implements AutoCloseable { private Socket client; public EchoClientHandle() throws Exception { this.client = new Socket(ServerInfo.ECHO_SERVER_HOST, ServerInfo.PORT); System.out.println("已经成功的连接到了服务器端,可以进行消息的发送处理。"); this.accessServer(); } private void accessServer() throws Exception { // 数据交互处理 Scanner scan = new Scanner(this.client.getInputStream()) ; // 服务器端的输出为客户端的输入 PrintStream out = new PrintStream(this.client.getOutputStream()) ; // 向服务器端发送内容 scan.useDelimiter("\n") ; boolean flag = true ; while(flag) { String data = InputUtil.getString("请输入要发送的数据信息:") ; out.println(data); // 先把内容发送到服务器端上 if ("exit".equalsIgnoreCase(data)) { flag = false ; // 结束循环s } if (scan.hasNext()) { System.out.println(scan.next()); } } } @Override public void close() throws Exception { this.client.close(); } } public class EchoClient { public static void main(String[] args) { try (EchoClientHandle echo = new EchoClientHandle()) { } catch(Exception e) {} } }
四.NIO
【JDK 1.4】提供有了一个java.nio的开发包,从这一时代开始,Java提升了IO的处理效率,同时也提升网络模型的处理效率,同时NIO里面采用的是同步非阻塞IO操作。
NIO的出现在当时来讲,给Java带来了一个最伟大的通讯利器(已经接近于底层的传输性能)。
NIO里面提倡使用Buffer来代替传统的数组操作(蚂蚁搬家),可以减少数组的操作复杂度,利用缓存数据的保存与方便的清空操作进行处理。
Reactor模型提倡的是:公共注册,统一操作,所以会提供有一系列的Channel(通道)。
NIOEchoServer.java
package com.bijian.study.nio.server; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.bijian.study.commons.ServerInfo; /** * 基于NIO实现数据的交互处理模式 */ public class NIOEchoServer { public static void main(String[] args) throws Exception { new EchoServerHandle(); } } /** * 实现一个专门用于客户端请求处理的线程对象 */ class SocketClientChannelThread implements Runnable { private SocketChannel clientChannel; // 客户端的信息 private boolean flag = true; // 循环处理的标记 public SocketClientChannelThread(SocketChannel clientChannel) { this.clientChannel = clientChannel; System.out.println("服务器端连接成功,可以与服务器端进行数据的交互操作..."); } @Override public void run() { // 真正的通讯处理的核心需要通过run()方法来进行操作 // NIO是基于Buffer缓冲操作实现的功能,需要将输入的内容保存在缓存之中 ByteBuffer buffer = ByteBuffer.allocate(50); // 开辟一个50大小的缓存空间 try { while (this.flag) { buffer.clear(); // 清空缓存操作,可以进行该缓存空间的重复使用 int readCount = this.clientChannel.read(buffer); // 服务器端读取客户端发送来的内容 // 将缓冲区之中保存的内容转位字节数组之后进行存储 String readMessage = new String(buffer.array(), 0, readCount).trim(); System.out.println("[服务器端接收消息]" + readMessage); // 输出一下提示信息 // 在进行整个的通讯过程里面,分隔符是一个绝对重要的概念,如果不能够很好的处理分隔符,那么无法进行有效通讯 String writeMessage = "[ECHO]" + readMessage + "\n"; // 进行消息的回应处理 if ("exit".equalsIgnoreCase(readMessage)) { writeMessage = "[ECHO]Bye Byte ... kiss"; // 结束消息 this.flag = false; // 要结束当前的循环操作 } // 现在的数据是在字符串之中,如果要回应内容,需要将内容保存在Buffer之中 buffer.clear(); // 将已经保存的内容(内容已经处理完毕)清除 buffer.put(writeMessage.getBytes()); // 保存回应信息 buffer.flip(); // 重置缓冲区 this.clientChannel.write(buffer); } this.clientChannel.close(); } catch (Exception e) { e.printStackTrace(); } } } class EchoServerHandle implements AutoCloseable { // 定义服务器端的服务处理类 private ExecutorService executorService; private ServerSocketChannel serverSocketChannel; // 服务器端的通讯通道 private Selector selector; private SocketChannel clientChannel; // 客户端的信息 public EchoServerHandle() throws Exception { this.executorService = Executors.newFixedThreadPool(5); // 当前的执行线程只允许有5个 this.serverSocketChannel = ServerSocketChannel.open(); // 打开服务器端连接通道 this.serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式 this.serverSocketChannel.bind(new InetSocketAddress(ServerInfo.PORT)); // NIO之中的Reactor模型重点在于所有的Channel需要向Selector之中进行注册 this.selector = Selector.open(); // 获取Selector实例 this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 服务器端需要进行接收 System.out.println("服务器端程序启动,该程序在" + ServerInfo.PORT + "端口上进行监听..."); this.clientHandle(); } private void clientHandle() throws Exception { int keySelect = 0; // 保存一个当前的状态 while ((keySelect = this.selector.select()) > 0) { // 需要进行连接等待 Set<SelectionKey> selectedKeys = this.selector.selectedKeys(); // 获取全部连接通道信息 Iterator<SelectionKey> selectionIter = selectedKeys.iterator(); while (selectionIter.hasNext()) { SelectionKey selectionKey = selectionIter.next(); // 获取每一个通道 if (selectionKey.isAcceptable()) { // 该通道为接收状态 this.clientChannel = this.serverSocketChannel.accept(); // 等待连接 if (this.clientChannel != null) { // 当前有连接 this.executorService.submit(new SocketClientChannelThread(this.clientChannel)); } } selectionIter.remove(); // 移除掉此通道 } } } @Override public void close() throws Exception { this.executorService.shutdown(); // 关闭线程池 this.serverSocketChannel.close(); // 关闭服务器端 } }
NIOEchoClient.java
package com.bijian.study.nio.client; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import com.bijian.study.commons.ServerInfo; import com.bijian.study.util.InputUtil; /** * 进行NIO客户端的连接访问 */ public class NIOEchoClient { public static void main(String[] args) throws Exception { try(EchoClientHandle handle = new EchoClientHandle()) { } catch (Exception e) {} } } class EchoClientHandle implements AutoCloseable { private SocketChannel clientChannel ; public EchoClientHandle() throws Exception { this.clientChannel = SocketChannel.open() ; // 创建一个客户端的通道实例 // 设置要连接的主机信息,包括主机名称以及端口号 this.clientChannel.connect(new InetSocketAddress(ServerInfo.ECHO_SERVER_HOST,ServerInfo.PORT)) ; this.accessServer(); } public void accessServer() throws Exception { // 访问服务器端 ByteBuffer buffer = ByteBuffer.allocate(50) ; // 开辟一个缓冲区 boolean flag = true ; while(flag) { buffer.clear() ; // 清空缓冲区,因为该部分代码会重复执行 String msg = InputUtil.getString("请输入要发送的内容:") ; buffer.put(msg.getBytes()) ; // 将此数据保存在缓冲区之中 buffer.flip() ; // 重置缓冲区 this.clientChannel.write(buffer) ; // 发送数据内容 // 当消息发送过去之后还需要进行返回内容的接收处理 buffer.clear() ; // 清空缓冲区,等待新的内容的输入 int readCount = this.clientChannel.read(buffer) ; // 将内容读取到缓冲区之中,并且返回个数 buffer.flip() ; // 得到前需要进行重置 System.out.println(new String(buffer.array(),0,readCount)); // 输出信息 if ("exit".equalsIgnoreCase(msg)) { flag = false ; } } } @Override public void close() throws Exception { this.clientChannel.close(); } }
五.AIO
【JDK 1.7】AIO,异步非阻塞IO通讯,需要采用大量的回调处理模式,所以需要使用:
public interface CompletionHandler<V,A>
AIOEchoServer.java
package com.bijian.study.aio.server; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; import com.bijian.study.commons.ServerInfo; /** * 2.实现客户端的回应处理操作 */ class EchoHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private boolean exit = false; // 进行操作的结束标记判断 public EchoHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); String readMessage = new String(buffer.array(), 0, buffer.remaining()).trim(); System.out.println("[服务器端接收到消息内容]" + readMessage); String resultMessage = "[ECHO]" + readMessage + "\n"; // 回应信息 if ("exit".equalsIgnoreCase(readMessage)) { resultMessage = "[EXIT]Bye Bye ... kiss" + "\n"; this.exit = true; // 结束 } this.echoWrite(resultMessage); } private void echoWrite(String result) { ByteBuffer buffer = ByteBuffer.allocate(50); buffer.put(result.getBytes()); buffer.flip(); this.clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { // 当前有数据 EchoHandler.this.clientChannel.write(buffer, buffer, this); } else { if (EchoHandler.this.exit == false) { // 需要继续交互 ByteBuffer readBuffer = ByteBuffer.allocate(50); EchoHandler.this.clientChannel.read(readBuffer,readBuffer,new EchoHandler(EchoHandler.this.clientChannel)); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { EchoHandler.this.clientChannel.close(); } catch (Exception e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); // 关闭通道 } catch (Exception e) { } } } /** * 1.实现客户端连接回调的处理操作 */ class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServerThread> { @Override public void completed(AsynchronousSocketChannel result, AIOServerThread attachment) { attachment.getServerChannel().accept(attachment, this); // 接收连接对象 ByteBuffer buffer = ByteBuffer.allocate(50); result.read(buffer, buffer, new EchoHandler(result)); } @Override public void failed(Throwable exc, AIOServerThread attachment) { System.out.println("服务器端客户端连接失败 ..."); attachment.getLatch().countDown(); // 恢复执行 } } class AIOServerThread implements Runnable { // 是进行AIO处理的线程类 private AsynchronousServerSocketChannel serverChannel; private CountDownLatch latch; // 进行线程等待操作 public AIOServerThread() throws Exception { this.latch = new CountDownLatch(1); // 设置一个线程等待个数 this.serverChannel = AsynchronousServerSocketChannel.open(); // 打开异步的通道 this.serverChannel.bind(new InetSocketAddress(ServerInfo.PORT)); // 绑定服务端口 System.out.println("服务器启动成功,在" + ServerInfo.PORT + "端口上监听服务 ..."); } public AsynchronousServerSocketChannel getServerChannel() { return serverChannel; } public CountDownLatch getLatch() { return latch; } @Override public void run() { this.serverChannel.accept(this, new AcceptHandler()); // 等待客户端连接 try { this.latch.await(); // 进入等待时机 } catch (Exception e) { } } } public class AIOEchoServer { public static void main(String[] args) throws Exception { new Thread(new AIOServerThread()).start(); } }
AIOEchoClient.java
package com.bijian.study.aio.client; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; import com.bijian.study.commons.ServerInfo; import com.bijian.study.util.InputUtil; class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); String receiveMessage = new String(buffer.array(), 0, buffer.remaining()); System.out.println(receiveMessage); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); } catch (Exception e) { } this.latch.countDown(); } } class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { this.clientChannel.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(50); this.clientChannel.read(readBuffer, readBuffer,new ClientReadHandler(this.clientChannel, this.latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); } catch (Exception e) { } this.latch.countDown(); } } class AIOClientThread implements Runnable { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AIOClientThread() throws Exception { this.clientChannel = AsynchronousSocketChannel.open(); // 打开客户端的Channel this.clientChannel.connect(new InetSocketAddress(ServerInfo.ECHO_SERVER_HOST, ServerInfo.PORT)); this.latch = new CountDownLatch(1); } @Override public void run() { try { this.latch.await(); this.clientChannel.close(); } catch (Exception e) { } } /** * 进行消息的发送处理 * @param msg 输入的交互内容 * @return 是否停止交互的标记 */ public boolean sendMessage(String msg) { ByteBuffer buffer = ByteBuffer.allocate(50); buffer.put(msg.getBytes()); buffer.flip(); this.clientChannel.write(buffer, buffer, new ClientWriteHandler(this.clientChannel, this.latch)); if ("exit".equalsIgnoreCase(msg)) { return false; } return true; } } public class AIOEchoClient { public static void main(String[] args) throws Exception { AIOClientThread client = new AIOClientThread(); new Thread(client).start(); while (client.sendMessage(InputUtil.getString("请输入要发送的信息:"))) { ; } } }
六.总结
BIO(同步阻塞IO):在进行处理的时候是通过一个线程进行操作,并且IO实现通讯的时候采用的是阻塞模式; 你现在通过水壶烧水,在BIO的世界里面,烧水这一过程你需要从头一直监视到结尾;
NIO(同步非阻塞IO):不断的进行烧水状态的判断,同时你可以做其他的事情;
AIO(异步非阻塞IO):烧水的过程你不用关注,如果水一旦烧好了,就会给你一个反馈。
以上所讲解的程序都属于最为基础的通讯模型,但是如果你真的只是依靠这样的开发操作去编写程序,那么基本上就决定你的项目八成会失败。
真实的项目之中如果要想实现这些通讯的操作一般要考虑:粘包和拆包、序列化、HTTP协议如何去写、WebSocket的定义实现。都需要你去精通这些协议。
为了简化这些操作在实际的项目里面,强烈建议,这些底层功能都通过Netty包装。
特别说明:这是《开课吧》的网络公开课的笔记
上一篇: ubuntu下的nginx+php+mysql配置
下一篇: php如何判断文件是否被修改