欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Tomcat7中一次请求处理的前世今生(二)Socket连接转换成内部请求对象

程序员文章站 2022-03-07 21:29:25
...

先抛开之前所看到的Tomcat源码不谈,Tomcat作为一个用Java实现的Web服务器,如果让你来实现,那么从何入手?

这里首先需要厘清的是Web服务器的概念,谷歌了一下,发现这条解释还算靠谱点,【在网络环境下可以向发出请求的浏览器提供文档的程序】。这里面重点有两条,1.网络环境下,2.能够给出响应。用Java写过网络通信程序的都知道,这里必然会用到Socket编程。我们自己要实现的服务器程序作为Socket编程里的服务端,浏览器作为Socket编程里的客户端。

要理解Tomcat原理,Socket编程这块的基本原理必须得了解,google一把一大堆,这里不再单独做介绍。下面给出一个服务器端最简单的响应客户端请求的伪代码示例:

			ServerSocket serverSocket  = new ServerSocket(8080, 1,
					InetAddress.getByName(“localhost”));
			Socket socket = null;
			InputStream is = null;
			OutputStream os = null;
			try {
				socket = serverSocket.accept();//1.监听到客户端的连接
				is = socket.getInputStream();
				os = socket.getOutputStream();
				Request request = Util.getRequest(is);//2.从输入流中读取数据,并根据Http协议转换成请求
				Response response = Util.service(request);//服务器内部根据请求信息给出响应信息
				os.writeResponse(response);//3.将响应信息写到输出流
			} catch (Exception e) {
				e.printStackTrace();
			} finally {//4.关闭输入输出流及连接
				if (is != null) {
					is.close();
				}
				if (os != null) {
					os.close();
				}
				socket.close();
			}

浏览器和Web服务器的一次交互过程分四步:连接、请求、响应、关闭。前一篇文章分析到的接收器线程,如前文开始截图里的http-bio-8080-Acceptor-0,这个线程的实现类org.apache.tomcat.util.net.JIoEndpoint.Acceptor,源码如下:

    // --------------------------------------------------- Acceptor Inner Class
    /**
     * The background thread that listens for incoming TCP/IP connections and
     * hands them off to an appropriate processor.
     */
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    Socket socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSocketFactory.acceptSocket(serverSocket);
                    } catch (IOException ioe) {
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused && setSocketOptions(socket)) {
                        // Hand this socket off to an appropriate processor
                        if (!processSocket(socket)) {
                            countDownConnection();
                            // Close socket right away
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        // Close socket right away
                        closeSocket(socket);
                    }
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (NullPointerException npe) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), npe);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

第39行做的事就是上面伪代码示例里的监听客户端连接,监听到连接后(即浏览器向服务器发起一次请求)在第53行由processSocket方法来处理这次接收到的Socket连接。processSocket方法源码如下:

    protected boolean processSocket(Socket socket) {
        // Process the request from this socket
        try {
            SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
            wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
            // During shutdown, executor may be null - avoid NPE
            if (!running) {
                return false;
            }
            getExecutor().execute(new SocketProcessor(wrapper));
        } catch (RejectedExecutionException x) {
            log.warn("Socket processing request was rejected for:"+socket,x);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

该方法中先把Socket包装成SocketWrapper,用以内部处理。重点是第10行:getExecutor().execute(new SocketProcessor(wrapper))。如果按照上面伪代码中的处理方式,在有并发请求的情况下,一个请求没有处理完成,服务器将无法立即响应另一个请求。这里做了一下改进,即在接收到一次Socket连接后另启一个线程处理该连接,使当前线程不阻塞。

 

下面跟着SocketProcessor这个线程来看看,一次Socket连接是如何在Tomcat7中被转成内部的Request的。看下该线程的run方法:

        @Override
        public void run() {
            boolean launch = false;
            synchronized (socket) {
                try {
                    SocketState state = SocketState.OPEN;

                    try {
                        // SSL handshake
                        serverSocketFactory.handshake(socket.getSocket());
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("endpoint.err.handshake"), t);
                        }
                        // Tell to close the socket
                        state = SocketState.CLOSED;
                    }

                    if ((state != SocketState.CLOSED)) {
                        if (status == null) {
                            state = handler.process(socket, SocketStatus.OPEN);
                        } else {
                            state = handler.process(socket,status);
                        }
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket
                        if (log.isTraceEnabled()) {
                            log.trace("Closing socket:"+socket);
                        }
                        countDownConnection();
                        try {
                            socket.getSocket().close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    } else if (state == SocketState.OPEN ||
                            state == SocketState.UPGRADING  ||
                            state == SocketState.UPGRADED){
                        socket.setKeptAlive(true);
                        socket.access();
                        launch = true;
                    } else if (state == SocketState.LONG) {
                        socket.access();
                        waitingRequests.add(socket);
                    }
                } finally {
                    if (launch) {
                        try {
                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
                        } catch (RejectedExecutionException x) {
                            log.warn("Socket reprocessing request was rejected for:"+socket,x);
                            try {
                                //unable to handle connection at this time
                                handler.process(socket, SocketStatus.DISCONNECT);
                            } finally {
                                countDownConnection();
                            }


                        } catch (NullPointerException npe) {
                            if (running) {
                                log.error(sm.getString("endpoint.launch.fail"),
                                        npe);
                            }
                        }
                    }
                }
            }
            socket = null;
            // Finish up this request
        }

    }

默认情况下会走到第22行,调用handler对象的process方法,这里handler对象实际上是Http11ConnectionHandler类的实例,该对象的初始化过程是在org.apache.coyote.http11.Http11Protocol对象的构造方法中:

    public Http11Protocol() {
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

所以需要到org.apache.coyote.http11.Http11Protocol类的静态内部类Http11ConnectionHandler中找到process方法的定义,但当前定义里面没有,这个方法是在其父类org.apache.coyote.AbstractProtocol.AbstractConnectionHandler中定义的:

        public SocketState process(SocketWrapper<S> socket,
                SocketStatus status) {
            Processor<S> processor = connections.remove(socket.getSocket());

            if (status == SocketStatus.DISCONNECT && processor == null) {
                //nothing more to be done endpoint requested a close
                //and there are no object associated with this connection
                return SocketState.CLOSED;
            }

            socket.setAsync(false);

            try {
                if (processor == null) {
                    processor = recycledProcessors.poll();
                }
                if (processor == null) {
                    processor = createProcessor();
                }

                initSsl(socket, processor);

                SocketState state = SocketState.CLOSED;
                do {
                    if (status == SocketStatus.DISCONNECT &&
                            !processor.isComet()) {
                        // Do nothing here, just wait for it to get recycled
                        // Don't do this for Comet we need to generate an end
                        // event (see BZ 54022)
                    } else if (processor.isAsync() ||
                            state == SocketState.ASYNC_END) {
                        state = processor.asyncDispatch(status);
                    } else if (processor.isComet()) {
                        state = processor.event(status);
                    } else if (processor.isUpgrade()) {
                        state = processor.upgradeDispatch();
                    } else {
                        state = processor.process(socket);
                    }
    
                    if (state != SocketState.CLOSED && processor.isAsync()) {
                        state = processor.asyncPostProcess();
                    }

                    if (state == SocketState.UPGRADING) {
                        // Get the UpgradeInbound handler
                        UpgradeInbound inbound = processor.getUpgradeInbound();
                        // Release the Http11 processor to be re-used
                        release(socket, processor, false, false);
                        // Create the light-weight upgrade processor
                        processor = createUpgradeProcessor(socket, inbound);
                        inbound.onUpgradeComplete();
                    }
                } while (state == SocketState.ASYNC_END ||
                        state == SocketState.UPGRADING);

                if (state == SocketState.LONG) {
                    // In the middle of processing a request/response. Keep the
                    // socket associated with the processor. Exact requirements
                    // depend on type of long poll
                    longPoll(socket, processor);
                } else if (state == SocketState.OPEN) {
                    // In keep-alive but between requests. OK to recycle
                    // processor. Continue to poll for the next request.
                    release(socket, processor, false, true);
                } else if (state == SocketState.SENDFILE) {
                    // Sendfile in progress. If it fails, the socket will be
                    // closed. If it works, the socket will be re-added to the
                    // poller
                    release(socket, processor, false, false);
                } else if (state == SocketState.UPGRADED) {
                    // Need to keep the connection associated with the processor
                    longPoll(socket, processor);
                } else {
                    // Connection closed. OK to recycle the processor.
                    if (!(processor instanceof UpgradeProcessor)) {
                        release(socket, processor, true, false);
                    }
                }
                return state;
            } catch(java.net.SocketException e) {
                // SocketExceptions are normal
                getLog().debug(sm.getString(
                        "abstractConnectionHandler.socketexception.debug"), e);
            } catch (java.io.IOException e) {
                // IOExceptions are normal
                getLog().debug(sm.getString(
                        "abstractConnectionHandler.ioexception.debug"), e);
            }
            // Future developers: if you discover any other
            // rare-but-nonfatal exceptions, catch them here, and log as
            // above.
            catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                // any other exception or error is odd. Here we log it
                // with "ERROR" level, so it will show up even on
                // less-than-verbose logs.
                getLog().error(
                        sm.getString("abstractConnectionHandler.error"), e);
            }
            // Don't try to add upgrade processors back into the pool
            if (!(processor instanceof UpgradeProcessor)) {
                release(socket, processor, true, false);
            }
            return SocketState.CLOSED;
        }

重点在第38行,调用processor的process方法处理socket。而processor对象在第18行通过createProcessor方法创建出来的,createProcessor方法在当前类里面是抽象方法,默认情况下调用的具体实现类在上面提到的Http11ConnectionHandler类中:

        @Override
        protected Http11Processor createProcessor() {
            Http11Processor processor = new Http11Processor(
                    proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint,
                    proto.getMaxTrailerSize());
            processor.setAdapter(proto.adapter);
            processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
            processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
            processor.setConnectionUploadTimeout(
                    proto.getConnectionUploadTimeout());
            processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
            processor.setCompressionMinSize(proto.getCompressionMinSize());
            processor.setCompression(proto.getCompression());
            processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
            processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
            processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
            processor.setSocketBuffer(proto.getSocketBuffer());
            processor.setMaxSavePostSize(proto.getMaxSavePostSize());
            processor.setServer(proto.getServer());
            processor.setDisableKeepAlivePercentage(
                    proto.getDisableKeepAlivePercentage());
            register(processor);
            return processor;
        }

此时的processor对象是Http11Processor类的实例,再看上一段提到的processor.process方法,最终会执行到Http11Processor类(因为该类中没有定义process方法)的父类org.apache.coyote.http11.AbstractHttp11Processor中的process方法。

 

为了方便理解,下面的时序图列出从Acceptor线程的run方法到AbstractHttp11Processor类的process方法的关键方法调用过程:

Tomcat7中一次请求处理的前世今生(二)Socket连接转换成内部请求对象
            
    
    博客分类: Tomcat7源码分析Java tomcat源码分析 

 

接下来分析org.apache.coyote.http11.AbstractHttp11Processor类的process方法:

    @Override
    public SocketState process(SocketWrapper<S> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);
        getInputBuffer().init(socketWrapper, endpoint);
        getOutputBuffer().init(socketWrapper, endpoint);

        // Flags
        error = false;
        keepAlive = true;
        comet = false;
        openSocket = false;
        sendfileInProgress = false;
        readComplete = true;
        if (endpoint.getUsePolling()) {
            keptAlive = false;
        } else {
            keptAlive = socketWrapper.isKeptAlive();
        }

        if (disableKeepAlive()) {
            socketWrapper.setKeepAliveLeft(0);
        }

        while (!error && keepAlive && !comet && !isAsync() &&
                upgradeInbound == null && !endpoint.isPaused()) {

            // Parsing the request header
            try {
                setRequestLineReadTimeout();

                if (!getInputBuffer().parseRequestLine(keptAlive)) {
                    if (handleIncompleteRequestLineRead()) {
                        break;
                    }
                }

                if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    response.setStatus(503);
                    error = true;
                } else {
                    // Make sure that connectors that are non-blocking during
                    // header processing (NIO) only set the start time the first
                    // time a request is processed.
                    if (request.getStartTime() < 0) {
                        request.setStartTime(System.currentTimeMillis());
                    }
                    keptAlive = true;
                    // Set this every time in case limit has been changed via JMX
                    request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                    // Currently only NIO will ever return false here
                    if (!getInputBuffer().parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!disableUploadTimeout) {
                        setSocketTimeout(connectionUploadTimeout);
                    }
                }
            } catch (IOException e) {
                if (getLog().isDebugEnabled()) {
                    getLog().debug(
                            sm.getString("http11processor.header.parse"), e);
                }
                error = true;
                break;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                UserDataHelper.Mode logMode = userDataHelper.getNextMode();
                if (logMode != null) {
                    String message = sm.getString(
                            "http11processor.header.parse");
                    switch (logMode) {
                        case INFO_THEN_DEBUG:
                            message += sm.getString(
                                    "http11processor.fallToDebug");
                            //$FALL-THROUGH$
                        case INFO:
                            getLog().info(message);
                            break;
                        case DEBUG:
                            getLog().debug(message);
                    }
                }
                // 400 - Bad Request
                response.setStatus(400);
                adapter.log(request, response, 0);
                error = true;
            }

            if (!error) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString(
                                "http11processor.request.prepare"), t);
                    }
                    // 400 - Internal Server Error
                    response.setStatus(400);
                    adapter.log(request, response, 0);
                    error = true;
                }
            }

            if (maxKeepAliveRequests == 1) {
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                keepAlive = false;
            }

            // Process the request in the adapter
            if (!error) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                (!isAsync() &&
                                statusDropsConnection(response.getStatus()));
                    }
                    setCometTimeouts(socketWrapper);
                } catch (InterruptedIOException e) {
                    error = true;
                } catch (HeadersTooLargeException e) {
                    error = true;
                    // The response should not have been committed but check it
                    // anyway to be safe
                    if (!response.isCommitted()) {
                        response.reset();
                        response.setStatus(500);
                        response.setHeader("Connection", "close");
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    getLog().error(sm.getString(
                            "http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    adapter.log(request, response, 0);
                    error = true;
                }
            }

            // Finish the handling of the request
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);

            if (!isAsync() && !comet) {
                if (error) {
                    // If we know we are closing the connection, don't drain
                    // input. This way uploading a 100GB file doesn't tie up the
                    // thread if the servlet has rejected it.
                    getInputBuffer().setSwallowInput(false);
                }
                endRequest();
            }

            rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (error) {
                response.setStatus(500);
            }
            request.updateCounters();

            if (!isAsync() && !comet || error) {
                getInputBuffer().nextRequest();
                getOutputBuffer().nextRequest();
            }

            if (!disableUploadTimeout) {
                if(endpoint.getSoTimeout() > 0) {
                    setSocketTimeout(endpoint.getSoTimeout());
                } else {
                    setSocketTimeout(0);
                }
            }

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

            if (breakKeepAliveLoop(socketWrapper)) {
                break;
            }
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (error || endpoint.isPaused()) {
            return SocketState.CLOSED;
        } else if (isAsync() || comet) {
            return SocketState.LONG;
        } else if (isUpgrade()) {
            return SocketState.UPGRADING;
        } else {
            if (sendfileInProgress) {
                return SocketState.SENDFILE;
            } else {
                if (openSocket) {
                    if (readComplete) {
                        return SocketState.OPEN;
                    } else {
                        return SocketState.LONG;
                    }
                } else {
                    return SocketState.CLOSED;
                }
            }
        }
    }

从这个方法中可以清晰的看出解析请求的过程:第7到10行从Socket中获取输入输出流,第32到97行解析请求行和请求头,第99到115行校验和解析请求头中的属性,第125到160行调用适配器的service方法,第172行请求处理结束。

上面就是根据Http协议解析请求的总体流程。要理解上面提到的请求行、请求头等术语,需要熟悉Http协议,这里简单介绍下Http协议中的标准请求信息数据的格式:

请求信息包括以下三条

  • 请求行(request line)
    • 例如GET /images/logo.gif HTTP/1.1,表示从/images目录下请求logo.gif这个文件。
  • 请求头(request header),空行
    • 例如Accept-Language: en
  • 其他消息体

请求行和标题必须以<CR><LF>作为结尾。空行内必须只有<CR><LF>而无其他空格。在HTTP/1.1协议中,所有的请求头,除Host外,都是可选的。

 

请求行、请求头数据的格式具体看Http协议中的描述。所以在从输入流中读取到字节流数据之后必须按照请求行、请求头、消息体的顺序来解析。

 

这里以请求行数据的解析为例,在Http协议中该行内容格式为:

Request-Line   = Method SP Request-URI SP HTTP-Version CRLF

即请求类型、要访问的资源(URI)以及使用的HTTP版本,中间以特殊字符空格来分隔,以\r\n字符结尾。

 

在上面列出的AbstractHttp11Processor类的process代码中的第36行,会调用抽象方法getInputBuffer(),当前该抽象方法的具体实现在子类org.apache.coyote.http11.Http11Processor中,该方法返回的是该类的实例变量inputBuffer:

    protected AbstractInputBuffer<Socket> getInputBuffer() {
        return inputBuffer;
    }

该实例变量在Http11Processor的构造方法中会被初始化:

    public Http11Processor(int headerBufferSize, JIoEndpoint endpoint,
            int maxTrailerSize) {

        super(endpoint);
        
        inputBuffer = new InternalInputBuffer(request, headerBufferSize);
        request.setInputBuffer(inputBuffer);

        outputBuffer = new InternalOutputBuffer(response, headerBufferSize);
        response.setOutputBuffer(outputBuffer);

        initializeFilters(maxTrailerSize);
    }

所以AbstractHttp11Processor类的process方法的36行getInputBuffer().parseRequestLine()将会调用org.apache.coyote.http11.InternalInputBuffer类中的parseRequestLine方法:

    public boolean parseRequestLine(boolean useAvailableDataOnly)
    
        throws IOException {

        int start = 0;

        //
        // Skipping blank lines
        //

        byte chr = 0;
        do {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            chr = buf[pos++];

        } while ((chr == Constants.CR) || (chr == Constants.LF));

        pos--;

        // Mark the current buffer position
        start = pos;

        //
        // Reading the method name
        // Method name is always US-ASCII
        //

        boolean space = false;

        while (!space) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            // Spec says no CR or LF in method name
            if (buf[pos] == Constants.CR || buf[pos] == Constants.LF) {
                throw new IllegalArgumentException(
                        sm.getString("iib.invalidmethod"));
            }
            // Spec says single SP but it also says be tolerant of HT
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                space = true;
                request.method().setBytes(buf, start, pos - start);
            }

            pos++;

        }

        
        // Spec says single SP but also says be tolerant of multiple and/or HT
        while (space) {
            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                pos++;
            } else {
                space = false;
            }
        }

        // Mark the current buffer position
        start = pos;
        int end = 0;
        int questionPos = -1;

        //
        // Reading the URI
        //

        boolean eol = false;

        while (!space) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            // Spec says single SP but it also says be tolerant of HT
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                space = true;
                end = pos;
            } else if ((buf[pos] == Constants.CR) 
                       || (buf[pos] == Constants.LF)) {
                // HTTP/0.9 style request
                eol = true;
                space = true;
                end = pos;
            } else if ((buf[pos] == Constants.QUESTION) 
                       && (questionPos == -1)) {
                questionPos = pos;
            }

            pos++;

        }

        request.unparsedURI().setBytes(buf, start, end - start);
        if (questionPos >= 0) {
            request.queryString().setBytes(buf, questionPos + 1, 
                                           end - questionPos - 1);
            request.requestURI().setBytes(buf, start, questionPos - start);
        } else {
            request.requestURI().setBytes(buf, start, end - start);
        }

        // Spec says single SP but also says be tolerant of multiple and/or HT
        while (space) {
            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                pos++;
            } else {
                space = false;
            }
        }

        // Mark the current buffer position
        start = pos;
        end = 0;

        //
        // Reading the protocol
        // Protocol is always US-ASCII
        //

        while (!eol) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            if (buf[pos] == Constants.CR) {
                end = pos;
            } else if (buf[pos] == Constants.LF) {
                if (end == 0)
                    end = pos;
                eol = true;
            }

            pos++;

        }

        if ((end - start) > 0) {
            request.protocol().setBytes(buf, start, end - start);
        } else {
            request.protocol().setString("");
        }
        
        return true;

    }

先看这个方法中第16行,调用了当前类的fill方法:

    protected boolean fill() throws IOException {
        return fill(true);
    }

里面调用了重载方法fill:

    protected boolean fill(boolean block) throws IOException {

        int nRead = 0;

        if (parsingHeader) {

            if (lastValid == buf.length) {
                throw new IllegalArgumentException
                    (sm.getString("iib.requestheadertoolarge.error"));
            }

            nRead = inputStream.read(buf, pos, buf.length - lastValid);
            if (nRead > 0) {
                lastValid = pos + nRead;
            }

        } else {

            if (buf.length - end < 4500) {
                // In this case, the request header was really large, so we allocate a 
                // brand new one; the old one will get GCed when subsequent requests
                // clear all references
                buf = new byte[buf.length];
                end = 0;
            }
            pos = end;
            lastValid = pos;
            nRead = inputStream.read(buf, pos, buf.length - lastValid);
            if (nRead > 0) {
                lastValid = pos + nRead;
            }

        }

        return (nRead > 0);

    }

在这里可以看到从输入流中读取数据到缓冲区buf。按照上面列出的请求行数据格式,从字符流中将会按顺序得到请求的类型(method)、请求的URI和Http版本。具体实现流程如下:

在org.apache.coyote.http11.InternalInputBuffer类中的parseRequestLine方法,第34到57行根据请求头协议的格式,从中取出表示请求方法的字节数据并设置到内置实例变量request。第60到72行解析method和uri之间的空格字节SP,第83到119行读取表示请求的URI的字节数据并放到request变量中。第122到133行解析uri和http协议版本之间的空格字节SP,第144到第168行读取表示请求的Http协议版本的字节数据并放到request变量中。

 

以上是根据Http协议解析请求行(request line)的代码实现部分,解析请求头的部分见InternalInputBuffer类的parseHeader方法,不再赘述。

 

至此可以看到在Tomcat中如何从一次Socket连接中取出请求的数据,将这些原始的字符流数据转换成初步可以理解的Tomcat内置对象org.apache.coyote.Request的。下一篇文章将会看到已经转换成内部变量的请求对象在Tomcat容器中的流转经过,如何一步一步将请求送到最终要执行的某个web应用中的某个servlet对象的service方法中的。

 

  • Tomcat7中一次请求处理的前世今生(二)Socket连接转换成内部请求对象
            
    
    博客分类: Tomcat7源码分析Java tomcat源码分析 
  • 大小: 61.4 KB
相关标签: tomcat 源码分析