tomcat中的BIO应用——JIoEndpoint
一、BIO基础
BIO即阻塞式I/O,是Java提供的最基本的IO方式,在网络通信中,需要通过Socket在客户端与服务端建立双向链接以实现通信,其主要步骤如下:
1、服务端监听某个端口是否有链接请求。
2、客户端向服务端发出链接请求。
3、服务端向客户端返回Accept接受消息,此时链接成功。
4、客户端和服务端通过Send(), Write()等方法与对方通信。
5、关闭链接。
一张BIO的思维导图(图片来源:https://my.oschina.net/langxSpirit/blog/830620)
Java分别提供了两个类Socket和ServerSocket,用来表示双向链接的客户端和服务端,基于这两个类,一个简单的网络通信示例如下:
1、客户端
package com.wang.test.net.io.bio; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * 客户端 */ public class Client { public static void main(String[] args) throws Exception { try ( //向本机的8080端口发送请求 Socket socket = new Socket("127.0.0.1", 8080); //根据标准输入构造BufferedReader对象 BufferedReader clientInput = new BufferedReader(new InputStreamReader(System.in)); //通过Socket得到输出流,构造PrintWriter对象 PrintWriter writer = new PrintWriter(socket.getOutputStream()); //通过Socket得到输入流,构造BufferedReader对象 BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); ){ //读取输入信息 String input = clientInput.readLine(); while ( !input.equals("exit") ){ //将输入信息发送到服务器 writer.print(input); //刷新输出流,使服务器端可以马上收到请求信息 writer.flush(); //读取服务器端返回信息 System.out.println("服务器端相应为:" + reader.readLine()); //读取下一条输入信息 input = clientInput.readLine(); } } catch ( Exception e ) { System.out.println(e); } } }
2、服务端
package com.wang.test.net.io.bio; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; /** * 服务器端 */ public class Server { public static void main(String[] args) { try( //创建ServerSocket监听端口8080 ServerSocket server = new ServerSocket(8080); //等待客户端请求 Socket socket = server.accept(); //根据标准输入构造BufferedReader对象 BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in)); //通过Socket对象得到输出流,BufferedReader BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //通过Socket对象得到输出流,并构造PrintWrite对象 PrintWriter writer = new PrintWriter(socket.getOutputStream()); ){ //读取客户端请求 System.out.println("客户端请求:" + reader.readLine()); //输入服务端相应 String input = serverInput.readLine(); //如果输入内容为"exit",则退出 while (!input.equals("exit")){ writer.print(input); //向客户端输出该字符串 writer.flush(); //读取客户端请求 System.out.println("客户端请求:" + reader.readLine()); //输入服务器相应 input = serverInput.readLine(); } } catch ( Exception e ){ System.out.println(e); } } }
二、JIoEndpoint
Tomcat服务器实现中,Tomcat的IO监听由Endpoint完成,具体到BIO是JIoEndpoint,JIoEndpoint启动过程如下:
1、根据IP地址(多IP的情况)及端口创建ServerSocket实例;JIoEndpoint类bind()方法部分代码
if (serverSocket == null) { //根据IP地址(多IP的情况)及端口创建ServerSocket实例 try { if (getAddress() == null) { serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog()); } else { serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog(), getAddress()); } } catch (BindException orig) { String msg; if (getAddress() == null) msg = orig.getMessage() + " <null>:" + getPort(); else msg = orig.getMessage() + " " + getAddress().toString() + ":" + getPort(); BindException be = new BindException(msg); be.initCause(orig); throw be; } }
2、如果Connector没有配置共享线程池,创建请求处理线程池;JIoEndpoint类startInternal()方法部分代码
// Create worker collection if (getExecutor() == null) { //如果Connector没有配置共享线程池 createExecutor(); //创建请求处理线程池 }
createExecutor()方法实现在JIoEndpoint父抽象类中
public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }
3、根据AcceptThreadCount配置的数量,创建并启动Accept线程,JIoEndpoint类startInternal()方法部分代码
startAcceptorThreads(); //根据AcceptorThreadCount配置的数量,创建并启动Acceptor线程
startAcceptThread方法实现在父抽象类中:
/** * 根据AcceptorThreadCount配置的数量,创建并启动Acceptor线程 */ protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); //线程单独启动,并未放到线程池中,因此不会影响请求并发处理 t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
在该方法中,这些线程都是单独启动的,因此不会影响请求并发数量。createAcceptor获取JIoEndpoint类中的内部类Accept,该内部类继承JIoEndpoint父抽象类中的内部类Accept, Acceptor实现了Runnable接口,负责轮询接收客户端请求。JIoEndpoint类中的内部类Acceptor
/** * 负责轮询接收客户端请求, 还会检测EndPoint状态以及最大连接数 * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); Socket socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSocketFactory.acceptSocket(serverSocket); } catch (IOException ioe) { countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused && setSocketOptions(socket)) { //当接收到客户端请求 // Hand this socket off to an appropriate processor if (!processSocket(socket)) { // countDownConnection(); // Close socket right away closeSocket(socket); } } else { countDownConnection(); // Close socket right away closeSocket(socket); } } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (NullPointerException npe) { if (running) { log.error(sm.getString("endpoint.accept.fail"), npe); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } }
4、当接收到客户端请求后,创建SocketProcessor对象,并提交给线程池处理。
5、SocketProcessor并未直接处理Socket,而是将其交由具体的协议处理类,BIO方式下的HTTP协议使用HTTP11Processor。
JIoEndpoint类中的内部类SocketProcessor
/** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. * SocketProcessor并未直接处理Socket,而是将其交由具体的协议处理类,如Http11Processor用于处理BIO方式下的HTTP协议 */ protected class SocketProcessor implements Runnable { protected SocketWrapper<Socket> socket = null; protected SocketStatus status = null; public SocketProcessor(SocketWrapper<Socket> socket) { if (socket==null) throw new NullPointerException(); this.socket = socket; } public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) { this(socket); this.status = status; } @Override public void run() { boolean launch = false; synchronized (socket) { try { SocketState state = SocketState.OPEN; handler.beforeHandshake(socket); try { // SSL handshake serverSocketFactory.handshake(socket.getSocket()); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.handshake"), t); } // Tell to close the socket state = SocketState.CLOSED; } if ((state != SocketState.CLOSED)) { if (status == null) { state = handler.process(socket, SocketStatus.OPEN_READ); } else { state = handler.process(socket,status); } } if (state == SocketState.CLOSED) { // Close socket if (log.isTraceEnabled()) { log.trace("Closing socket:"+socket); } countDownConnection(); try { socket.getSocket().close(); } catch (IOException e) { // Ignore } } else if (state == SocketState.OPEN || state == SocketState.UPGRADING || state == SocketState.UPGRADED){ socket.setKeptAlive(true); socket.access(); launch = true; } else if (state == SocketState.LONG) { socket.access(); waitingRequests.add(socket); } } finally { if (launch) { try { getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); } catch (RejectedExecutionException x) { log.warn("Socket reprocessing request was rejected for:"+socket,x); try { //unable to handle connection at this time handler.process(socket, SocketStatus.DISCONNECT); } finally { countDownConnection(); } } catch (NullPointerException npe) { if (running) { log.error(sm.getString("endpoint.launch.fail"), npe); } } } } } socket = null; // Finish up this request } }
6、此外,JIoEndpoint还构造了一个单独的线程用于检测超时请求。JIoEndpoint类中的startInternal方法的部分代码
// Start async timeout thread 用于检测超时请求 setAsyncTimeout(new AsyncTimeout()); Thread timeoutThread = new Thread(getAsyncTimeout(), getName() + "-AsyncTimeout"); timeoutThread.setPriority(threadPriority); timeoutThread.setDaemon(true); timeoutThread.start();
推荐阅读