欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

tomcat中的BIO应用——JIoEndpoint

程序员文章站 2022-03-01 23:13:33
...

一、BIO基础

    BIO即阻塞式I/O,是Java提供的最基本的IO方式,在网络通信中,需要通过Socket在客户端与服务端建立双向链接以实现通信,其主要步骤如下:

    1、服务端监听某个端口是否有链接请求。

    2、客户端向服务端发出链接请求。

    3、服务端向客户端返回Accept接受消息,此时链接成功。

    4、客户端和服务端通过Send(), Write()等方法与对方通信。

    5、关闭链接。

 

    一张BIO的思维导图(图片来源:https://my.oschina.net/langxSpirit/blog/830620


tomcat中的BIO应用——JIoEndpoint
            
    
    博客分类: tomcat tomcatBIOJIoEndpoint
 

     Java分别提供了两个类Socket和ServerSocket,用来表示双向链接的客户端和服务端,基于这两个类,一个简单的网络通信示例如下:

    1、客户端

package com.wang.test.net.io.bio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * 客户端
 */
public class Client {

    public static void main(String[] args) throws Exception {
        try (
            //向本机的8080端口发送请求
            Socket socket = new Socket("127.0.0.1", 8080);
            //根据标准输入构造BufferedReader对象
            BufferedReader clientInput = new BufferedReader(new InputStreamReader(System.in));
            //通过Socket得到输出流,构造PrintWriter对象
            PrintWriter writer = new PrintWriter(socket.getOutputStream());
            //通过Socket得到输入流,构造BufferedReader对象
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        ){
            //读取输入信息
            String input = clientInput.readLine();
            while ( !input.equals("exit") ){
                //将输入信息发送到服务器
                writer.print(input);
                //刷新输出流,使服务器端可以马上收到请求信息
                writer.flush();
                //读取服务器端返回信息
                System.out.println("服务器端相应为:" + reader.readLine());
                //读取下一条输入信息
                input = clientInput.readLine();
            }
        } catch ( Exception e ) {
            System.out.println(e);
        }
    }
}

     2、服务端

package com.wang.test.net.io.bio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 服务器端
 */
public class Server {

    public static void main(String[] args) {
        try(
            //创建ServerSocket监听端口8080
            ServerSocket server = new ServerSocket(8080);
            //等待客户端请求
            Socket socket = server.accept();
            //根据标准输入构造BufferedReader对象
            BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in));
            //通过Socket对象得到输出流,BufferedReader
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            //通过Socket对象得到输出流,并构造PrintWrite对象
            PrintWriter writer = new PrintWriter(socket.getOutputStream());
        ){
            //读取客户端请求
            System.out.println("客户端请求:" + reader.readLine());
            //输入服务端相应
            String input = serverInput.readLine();
            //如果输入内容为"exit",则退出
            while (!input.equals("exit")){
                writer.print(input);
                //向客户端输出该字符串
                writer.flush();
                //读取客户端请求
                System.out.println("客户端请求:" + reader.readLine());
                //输入服务器相应
                input = serverInput.readLine();
            }
        } catch ( Exception e ){
            System.out.println(e);
        }
    }
}

 

二、JIoEndpoint

    Tomcat服务器实现中,Tomcat的IO监听由Endpoint完成,具体到BIO是JIoEndpoint,JIoEndpoint启动过程如下:

    1、根据IP地址(多IP的情况)及端口创建ServerSocket实例;JIoEndpoint类bind()方法部分代码

        if (serverSocket == null) {	//根据IP地址(多IP的情况)及端口创建ServerSocket实例
            try {
                if (getAddress() == null) {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog());
                } else {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog(), getAddress());
                }
            } catch (BindException orig) {
                String msg;
                if (getAddress() == null)
                    msg = orig.getMessage() + " <null>:" + getPort();
                else
                    msg = orig.getMessage() + " " +
                            getAddress().toString() + ":" + getPort();
                BindException be = new BindException(msg);
                be.initCause(orig);
                throw be;
            }
        }

     2、如果Connector没有配置共享线程池,创建请求处理线程池;JIoEndpoint类startInternal()方法部分代码

            // Create worker collection
            if (getExecutor() == null) {	//如果Connector没有配置共享线程池
                createExecutor();	//创建请求处理线程池
            }

     createExecutor()方法实现在JIoEndpoint父抽象类中

    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

     3、根据AcceptThreadCount配置的数量,创建并启动Accept线程,JIoEndpoint类startInternal()方法部分代码

startAcceptorThreads();	//根据AcceptorThreadCount配置的数量,创建并启动Acceptor线程

     startAcceptThread方法实现在父抽象类中:

/**
     * 根据AcceptorThreadCount配置的数量,创建并启动Acceptor线程
     */
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);	//线程单独启动,并未放到线程池中,因此不会影响请求并发处理
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

     在该方法中,这些线程都是单独启动的,因此不会影响请求并发数量。createAcceptor获取JIoEndpoint类中的内部类Accept,该内部类继承JIoEndpoint父抽象类中的内部类Accept, Acceptor实现了Runnable接口,负责轮询接收客户端请求。JIoEndpoint类中的内部类Acceptor

/**
     * 负责轮询接收客户端请求, 还会检测EndPoint状态以及最大连接数
     * The background thread that listens for incoming TCP/IP connections and
     * hands them off to an appropriate processor.
     */
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    Socket socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSocketFactory.acceptSocket(serverSocket);	
                    } catch (IOException ioe) {
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused && setSocketOptions(socket)) {	//当接收到客户端请求
                        // Hand this socket off to an appropriate processor
                        if (!processSocket(socket)) {	//
                            countDownConnection();
                            // Close socket right away
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        // Close socket right away
                        closeSocket(socket);
                    }
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (NullPointerException npe) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), npe);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

    4、当接收到客户端请求后,创建SocketProcessor对象,并提交给线程池处理。

    5、SocketProcessor并未直接处理Socket,而是将其交由具体的协议处理类,BIO方式下的HTTP协议使用HTTP11Processor。

    JIoEndpoint类中的内部类SocketProcessor

/**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     * SocketProcessor并未直接处理Socket,而是将其交由具体的协议处理类,如Http11Processor用于处理BIO方式下的HTTP协议
     */
    protected class SocketProcessor implements Runnable {

        protected SocketWrapper<Socket> socket = null;
        protected SocketStatus status = null;

        public SocketProcessor(SocketWrapper<Socket> socket) {
            if (socket==null) throw new NullPointerException();
            this.socket = socket;
        }

        public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
            this(socket);
            this.status = status;
        }

        @Override
        public void run() {
            boolean launch = false;
            synchronized (socket) {
                try {
                    SocketState state = SocketState.OPEN;
                    handler.beforeHandshake(socket);
                    try {
                        // SSL handshake
                        serverSocketFactory.handshake(socket.getSocket());
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("endpoint.err.handshake"), t);
                        }
                        // Tell to close the socket
                        state = SocketState.CLOSED;
                    }

                    if ((state != SocketState.CLOSED)) {
                        if (status == null) {
                            state = handler.process(socket, SocketStatus.OPEN_READ);
                        } else {
                            state = handler.process(socket,status);
                        }
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket
                        if (log.isTraceEnabled()) {
                            log.trace("Closing socket:"+socket);
                        }
                        countDownConnection();
                        try {
                            socket.getSocket().close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    } else if (state == SocketState.OPEN ||
                            state == SocketState.UPGRADING  ||
                            state == SocketState.UPGRADED){
                        socket.setKeptAlive(true);
                        socket.access();
                        launch = true;
                    } else if (state == SocketState.LONG) {
                        socket.access();
                        waitingRequests.add(socket);
                    }
                } finally {
                    if (launch) {
                        try {
                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                        } catch (RejectedExecutionException x) {
                            log.warn("Socket reprocessing request was rejected for:"+socket,x);
                            try {
                                //unable to handle connection at this time
                                handler.process(socket, SocketStatus.DISCONNECT);
                            } finally {
                                countDownConnection();
                            }


                        } catch (NullPointerException npe) {
                            if (running) {
                                log.error(sm.getString("endpoint.launch.fail"),
                                        npe);
                            }
                        }
                    }
                }
            }
            socket = null;
            // Finish up this request
        }

    }

 6、此外,JIoEndpoint还构造了一个单独的线程用于检测超时请求。JIoEndpoint类中的startInternal方法的部分代码

      // Start async timeout thread 用于检测超时请求
      setAsyncTimeout(new AsyncTimeout());
      Thread timeoutThread = new Thread(getAsyncTimeout(), getName() + "-AsyncTimeout");
      timeoutThread.setPriority(threadPriority);
      timeoutThread.setDaemon(true);
      timeoutThread.start();

 

 

 

  • tomcat中的BIO应用——JIoEndpoint
            
    
    博客分类: tomcat tomcatBIOJIoEndpoint
  • 大小: 119.7 KB