欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Tomcat源码分析 (八)----- HTTP请求处理过程(一)

程序员文章站 2022-07-01 23:09:37
终于进行到Connector的分析阶段了,这也是Tomcat里面最复杂的一块功能了。Connector中文名为连接器,既然是连接器,它肯定会连接某些东西,连接些什么呢? Connector用于接受请求并将请求封装成Request和Response,然后交给Container进行处理,Containe ......

终于进行到connector的分析阶段了,这也是tomcat里面最复杂的一块功能了。connector中文名为连接器,既然是连接器,它肯定会连接某些东西,连接些什么呢?

connector用于接受请求并将请求封装成request和response,然后交给container进行处理,container处理完之后再交给connector返回给客户端。

要理解connector,我们需要问自己4个问题。

  • (1)connector如何接受请求的?
  • (2)如何将请求封装成request和response的?
  • (3)封装完之后的request和response如何交给container进行处理的?
  • (4)container处理完之后如何交给connector并返回给客户端的?

先来一张connector的整体结构图

Tomcat源码分析 (八)-----  HTTP请求处理过程(一)

【注意】:不同的协议、不同的通信方式,protocolhandler会有不同的实现。在tomcat8.5中,protocolhandler的类继承层级如下图所示。

Tomcat源码分析 (八)-----  HTTP请求处理过程(一)

 

针对上述的类继承层级图,我们做如下说明:

  1. ajp和http11是两种不同的协议
  2. nio、nio2和apr是不同的通信方式
  3. 协议和通信方式可以相互组合。

protocolhandler包含三个部件:endpointprocessoradapter

  1. endpoint用来处理底层socket的网络连接,processor用于将endpoint接收到的socket封装成request,adapter用于将request交给container进行具体的处理。
  2. endpoint由于是处理底层的socket网络连接,因此endpoint是用来实现tcp/ip协议的,而processor用来实现http协议的,adapter将请求适配到servlet容器进行具体的处理。
  3. endpoint的抽象实现类abstractendpoint里面定义了acceptorasynctimeout两个内部类和一个handler接口acceptor用于监听请求,asynctimeout用于检查异步request的超时,handler用于处理接收到的socket,在内部调用processor进行处理。

至此,我们已经明白了问题(1)、(2)和(3)。至于(4),当我们了解了container自然就明白了,前面章节内容已经详细分析过了。

connector源码分析入口

 我们在service标准实现standardservice的源码中发现,其init()start()stop()destroy()方法分别会对connectors的同名方法进行调用。而一个service对应着多个connector

service.init()

@override
protected void initinternal() throws lifecycleexception {
    super.initinternal();

    if (engine != null) {
        engine.init();
    }

    // initialize any executors
    for (executor executor : findexecutors()) {
        if (executor instanceof jmxenabled) {
            ((jmxenabled) executor).setdomain(getdomain());
        }
        executor.init();
    }

    // initialize mapper listener
    mapperlistener.init();

    // initialize our defined connectors
    synchronized (connectorslock) {
        for (connector connector : connectors) {
            try {
                connector.init();
            } catch (exception e) {
                string message = sm.getstring(
                        "standardservice.connector.initfailed", connector);
                log.error(message, e);

                if (boolean.getboolean("org.apache.catalina.startup.exit_on_init_failure"))
                    throw new lifecycleexception(message);
            }
        }
    }
}

service.start()

@override
protected void startinternal() throws lifecycleexception {
    if(log.isinfoenabled())
        log.info(sm.getstring("standardservice.start.name", this.name));
    setstate(lifecyclestate.starting);

    // start our defined container first
    if (engine != null) {
        synchronized (engine) {
            engine.start();
        }
    }

    synchronized (executors) {
        for (executor executor: executors) {
            executor.start();
        }
    }

    mapperlistener.start();

    // start our defined connectors second
    synchronized (connectorslock) {
        for (connector connector: connectors) {
            try {
                // if it has already failed, don't try and start it
                if (connector.getstate() != lifecyclestate.failed) {
                    connector.start();
                }
            } catch (exception e) {
                log.error(sm.getstring(
                        "standardservice.connector.startfailed",
                        connector), e);
            }
        }
    }
}

我们知道connector实现了lifecycle接口,所以它是一个生命周期组件。所以connector的启动逻辑入口在于init()start()

connector构造方法

在分析之前,我们看看server.xml,该文件已经体现出了tomcat中各个组件的大体结构。

<?xml version='1.0' encoding='utf-8'?>
<server port="8005" shutdown="shutdown">
  <listener classname="org.apache.catalina.startup.versionloggerlistener" />
  <listener classname="org.apache.catalina.core.aprlifecyclelistener" sslengine="on" />
  <listener classname="org.apache.catalina.core.jrememoryleakpreventionlistener" />
  <listener classname="org.apache.catalina.mbeans.globalresourceslifecyclelistener" />
  <listener classname="org.apache.catalina.core.threadlocalleakpreventionlistener" />

  <globalnamingresources>
    <resource name="userdatabase" auth="container"
              type="org.apache.catalina.userdatabase"
              description="user database that can be updated and saved"
              factory="org.apache.catalina.users.memoryuserdatabasefactory"
              pathname="conf/tomcat-users.xml" />
  </globalnamingresources>

  <service name="catalina">
    <connector port="8080" protocol="http/1.1" connectiontimeout="20000" redirectport="8443" />
    <connector port="8009" protocol="ajp/1.3" redirectport="8443" />

    <engine name="catalina" defaulthost="localhost">
      <realm classname="org.apache.catalina.realm.lockoutrealm">
        <realm classname="org.apache.catalina.realm.userdatabaserealm"
               resourcename="userdatabase"/>
      </realm>

      <host name="localhost"  appbase="webapps"
            unpackwars="true" autodeploy="true">
        <valve classname="org.apache.catalina.valves.accesslogvalve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="%h %l %u %t &quot;%r&quot; %s %b" />
      </host>
    </engine>
  </service>
</server>

在这个文件中,我们看到一个connector有几个关键属性,portprotocol是其中的两个。server.xml默认支持两种协议:http/1.1ajp/1.3。其中http/1.1用于支持http1.1协议,而ajp/1.3用于支持对apache服务器的通信。

接下来我们看看构造方法。

public connector() {
    this(null); // 1. 无参构造方法,传入参数为空协议,会默认使用`http/1.1`
}

public connector(string protocol) {
    setprotocol(protocol);
    // instantiate protocol handler
    // 5. 使用protocolhandler的类名构造protocolhandler的实例
    protocolhandler p = null;
    try {
        class<?> clazz = class.forname(protocolhandlerclassname);
        p = (protocolhandler) clazz.getconstructor().newinstance();
    } catch (exception e) {
        log.error(sm.getstring(
                "coyoteconnector.protocolhandlerinstantiationfailed"), e);
    } finally {
        this.protocolhandler = p;
    }

    if (globals.strict_servlet_compliance) {
        uricharset = standardcharsets.iso_8859_1;
    } else {
        uricharset = standardcharsets.utf_8;
    }
}

@deprecated
public void setprotocol(string protocol) {
    boolean aprconnector = aprlifecyclelistener.isapravailable() &&
            aprlifecyclelistener.getuseaprconnector();

    // 2. `http/1.1`或`null`,protocolhandler使用`org.apache.coyote.http11.http11nioprotocol`,不考虑apr
    if ("http/1.1".equals(protocol) || protocol == null) {
        if (aprconnector) {
            setprotocolhandlerclassname("org.apache.coyote.http11.http11aprprotocol");
        } else {
            setprotocolhandlerclassname("org.apache.coyote.http11.http11nioprotocol");
        }
    }
    // 3. `ajp/1.3`,protocolhandler使用`org.apache.coyote.ajp.ajpnioprotocol`,不考虑apr
    else if ("ajp/1.3".equals(protocol)) {
        if (aprconnector) {
            setprotocolhandlerclassname("org.apache.coyote.ajp.ajpaprprotocol");
        } else {
            setprotocolhandlerclassname("org.apache.coyote.ajp.ajpnioprotocol");
        }
    }
    // 4. 其他情况,使用传入的protocol作为protocolhandler的类名
    else {
        setprotocolhandlerclassname(protocol);
    }
}

从上面的代码我们看到构造方法主要做了下面几件事情:

  1. 无参构造方法,传入参数为空协议,会默认使用http/1.1
  2. http/1.1null,protocolhandler使用org.apache.coyote.http11.http11nioprotocol,不考虑apr
  3. ajp/1.3,protocolhandler使用org.apache.coyote.ajp.ajpnioprotocol,不考虑apr
  4. 其他情况,使用传入的protocol作为protocolhandler的类名
  5. 使用protocolhandler的类名构造protocolhandler的实例

connector.init()

@override
protected void initinternal() throws lifecycleexception {
    super.initinternal();

    // initialize adapter
    // 1. 初始化adapter
    adapter = new coyoteadapter(this);
    protocolhandler.setadapter(adapter);

    // make sure parsebodymethodsset has a default
    // 2. 设置接受body的method列表,默认为post
    if (null == parsebodymethodsset) {
        setparsebodymethods(getparsebodymethods());
    }

    if (protocolhandler.isaprrequired() && !aprlifecyclelistener.isapravailable()) {
        throw new lifecycleexception(sm.getstring("coyoteconnector.protocolhandlernoapr",
                getprotocolhandlerclassname()));
    }
    if (aprlifecyclelistener.isapravailable() && aprlifecyclelistener.getuseopenssl() &&
            protocolhandler instanceof abstracthttp11jsseprotocol) {
        abstracthttp11jsseprotocol<?> jsseprotocolhandler =
                (abstracthttp11jsseprotocol<?>) protocolhandler;
        if (jsseprotocolhandler.issslenabled() &&
                jsseprotocolhandler.getsslimplementationname() == null) {
            // openssl is compatible with the jsse configuration, so use it if apr is available
            jsseprotocolhandler.setsslimplementationname(opensslimplementation.class.getname());
        }
    }

    // 3. 初始化protocolhandler
    try {
        protocolhandler.init();
    } catch (exception e) {
        throw new lifecycleexception(
                sm.getstring("coyoteconnector.protocolhandlerinitializationfailed"), e);
    }
}

init()方法做了3件事情

  1. 初始化adapter
  2. 设置接受body的method列表,默认为post
  3. 初始化protocolhandler

protocolhandler类继承层级我们知道protocolhandler的子类都必须实现abstractprotocol抽象类,而protocolhandler.init();的逻辑代码正是在这个抽象类里面。我们来分析一下。

@override
public void init() throws exception {
    if (getlog().isinfoenabled()) {
        getlog().info(sm.getstring("abstractprotocolhandler.init", getname()));
    }

    if (oname == null) {
        // component not pre-registered so register it
        oname = createobjectname();
        if (oname != null) {
            registry.getregistry(null, null).registercomponent(this, oname, null);
        }
    }

    if (this.domain != null) {
        rgoname = new objectname(domain + ":type=globalrequestprocessor,name=" + getname());
        registry.getregistry(null, null).registercomponent(
                gethandler().getglobal(), rgoname, null);
    }

    // 1. 设置endpoint的名字,默认为:http-nio-{port}
    string endpointname = getname();
    endpoint.setname(endpointname.substring(1, endpointname.length()-1));
    endpoint.setdomain(domain);
    
    // 2. 初始化endpoint
    endpoint.init();
}

我们接着分析一下endpoint.init()里面又做了什么。该方法位于abstactendpoint抽象类,该类是基于模板方法模式实现的,主要调用了子类的bind()方法。

public abstract void bind() throws exception;
public abstract void unbind() throws exception;
public abstract void startinternal() throws exception;
public abstract void stopinternal() throws exception;

public void init() throws exception {
    // 执行bind()方法
    if (bindoninit) {
        bind();
        bindstate = bindstate.bound_on_init;
    }
    if (this.domain != null) {
        // register endpoint (as threadpool - historical name)
        oname = new objectname(domain + ":type=threadpool,name=\"" + getname() + "\"");
        registry.getregistry(null, null).registercomponent(this, oname, null);

        objectname socketpropertiesoname = new objectname(domain +
                ":type=threadpool,name=\"" + getname() + "\",subtype=socketproperties");
        socketproperties.setobjectname(socketpropertiesoname);
        registry.getregistry(null, null).registercomponent(socketproperties, socketpropertiesoname, null);

        for (sslhostconfig sslhostconfig : findsslhostconfigs()) {
            registerjmx(sslhostconfig);
        }
    }
}

继续分析bind()方法,我们终于看到了我们想要看的东西了。关键的代码在于serversock.socket().bind(addr,getacceptcount());,用于绑定serversocket到指定的ip和端口。

@override
public void bind() throws exception {

    if (!getuseinheritedchannel()) {
        serversock = serversocketchannel.open();
        socketproperties.setproperties(serversock.socket());
        inetsocketaddress addr = (getaddress()!=null?new inetsocketaddress(getaddress(),getport()):new inetsocketaddress(getport()));
        //绑定serversocket到指定的ip和端口
        serversock.socket().bind(addr,getacceptcount());
    } else {
        // retrieve the channel provided by the os
        channel ic = system.inheritedchannel();
        if (ic instanceof serversocketchannel) {
            serversock = (serversocketchannel) ic;
        }
        if (serversock == null) {
            throw new illegalargumentexception(sm.getstring("endpoint.init.bind.inherited"));
        }
    }

    serversock.configureblocking(true); //mimic apr behavior

    // initialize thread count defaults for acceptor, poller
    if (acceptorthreadcount == 0) {
        // fixme: doesn't seem to work that well with multiple accept threads
        acceptorthreadcount = 1;
    }
    if (pollerthreadcount <= 0) {
        //minimum one poller thread
        pollerthreadcount = 1;
    }
    setstoplatch(new countdownlatch(pollerthreadcount));

    // initialize ssl if needed
    initialisessl();

    selectorpool.open();
}

好了,我们已经分析完了init()方法,接下来我们分析start()方法。关键代码就一行,调用protocolhandler.start()方法。

connector.start()

@override
protected void startinternal() throws lifecycleexception {

    // validate settings before starting
    if (getport() < 0) {
        throw new lifecycleexception(sm.getstring(
                "coyoteconnector.invalidport", integer.valueof(getport())));
    }

    setstate(lifecyclestate.starting);

    try {
        protocolhandler.start();
    } catch (exception e) {
        throw new lifecycleexception(
                sm.getstring("coyoteconnector.protocolhandlerstartfailed"), e);
    }
}

我们深入protocolhandler.start()方法。

  1. 调用endpoint.start()方法
  2. 开启异步超时线程,线程执行单元为asynctimeout
@override
public void start() throws exception {
    if (getlog().isinfoenabled()) {
        getlog().info(sm.getstring("abstractprotocolhandler.start", getname()));
    }

    // 1. 调用`endpoint.start()`方法
    endpoint.start();

    // start async timeout thread
    // 2. 开启异步超时线程,线程执行单元为`asynctimeout`
    asynctimeout = new asynctimeout();
    thread timeoutthread = new thread(asynctimeout, getnameinternal() + "-asynctimeout");
    int priority = endpoint.getthreadpriority();
    if (priority < thread.min_priority || priority > thread.max_priority) {
        priority = thread.norm_priority;
    }
    timeoutthread.setpriority(priority);
    timeoutthread.setdaemon(true);
    timeoutthread.start();
}

这儿我们重点关注endpoint.start()方法

public final void start() throws exception {
    // 1. `bind()`已经在`init()`中分析过了
    if (bindstate == bindstate.unbound) {
        bind();
        bindstate = bindstate.bound_on_start;
    }
    startinternal();
}

@override
public void startinternal() throws exception {
    if (!running) {
        running = true;
        paused = false;

        processorcache = new synchronizedstack<>(synchronizedstack.default_size,
                socketproperties.getprocessorcache());
        eventcache = new synchronizedstack<>(synchronizedstack.default_size,
                        socketproperties.geteventcache());
        niochannels = new synchronizedstack<>(synchronizedstack.default_size,
                socketproperties.getbufferpool());

        // create worker collection
        // 2. 创建工作者线程池
        if ( getexecutor() == null ) {
            createexecutor();
        }
        
        // 3. 初始化连接latch,用于限制请求的并发量
        initializeconnectionlatch();

        // start poller threads
        // 4. 开启poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是handler的代码
        pollers = new poller[getpollerthreadcount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new poller();
            thread pollerthread = new thread(pollers[i], getname() + "-clientpoller-"+i);
            pollerthread.setpriority(threadpriority);
            pollerthread.setdaemon(true);
            pollerthread.start();
        }
        // 5. 开启acceptor线程
        startacceptorthreads();
    }
}

protected final void startacceptorthreads() {
    int count = getacceptorthreadcount();
    acceptors = new acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createacceptor();
        string threadname = getname() + "-acceptor-" + i;
        acceptors[i].setthreadname(threadname);
        thread t = new thread(acceptors[i], threadname);
        t.setpriority(getacceptorthreadpriority());
        t.setdaemon(getdaemon());
        t.start();
    }
}
  1. bind()已经在init()中分析过了
  2. 创建工作者线程池
  3. 初始化连接latch,用于限制请求的并发量
  4. 创建轮询poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是handler的代码
  5. 创建acceptor线程

connector请求逻辑

分析完了connector的启动逻辑之后,我们就需要进一步分析一下http的请求逻辑,当请求从客户端发起之后,需要经过哪些操作才能真正地得到执行?

acceptor

acceptor线程主要用于监听套接字,将已连接套接字转给poller线程。acceptor线程数由abstracendpoint的acceptorthreadcount成员变量控制,默认值为1

abstractendpoint.acceptor是abstractendpoint类的静态抽象类,实现了runnable接口,部分代码如下:
public abstract static class acceptor implements runnable {
    public enum acceptorstate {
        new, running, paused, ended
    }

    protected volatile acceptorstate state = acceptorstate.new;
    public final acceptorstate getstate() {
        return state;
    }

    private string threadname;
    protected final void setthreadname(final string threadname) {
        this.threadname = threadname;
    }
    protected final string getthreadname() {
        return threadname;
    }
}

nioendpoint的acceptor成员内部类继承了abstractendpoint.acceptor:

protected class acceptor extends abstractendpoint.acceptor {
    @override
    public void run() {
        int errordelay = 0;

        // loop until we receive a shutdown command
        while (running) {

            // loop if endpoint is paused
            // 1. 运行过程中,如果`endpoint`暂停了,则`acceptor`进行自旋(间隔50毫秒) `       
            while (paused && running) {
                state = acceptorstate.paused;
                try {
                    thread.sleep(50);
                } catch (interruptedexception e) {
                    // ignore
                }
            }
            // 2. 如果`endpoint`终止运行了,则`acceptor`也会终止
            if (!running) {
                break;
            }
            state = acceptorstate.running;

            try {
                //if we have reached max connections, wait
                // 3. 如果请求达到了最大连接数,则wait直到连接数降下来
                countuporawaitconnection();

                socketchannel socket = null;
                try {
                    // accept the next incoming connection from the server
                    // socket
                    // 4. 接受下一次连接的socket
                    socket = serversock.accept();
                } catch (ioexception ioe) {
                    // we didn't get a socket
                    countdownconnection();
                    if (running) {
                        // introduce delay if necessary
                        errordelay = handleexceptionwithdelay(errordelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // successful accept, reset the error delay
                errordelay = 0;

                // configure the socket
                if (running && !paused) {
                    // setsocketoptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 5. `setsocketoptions()`这儿是关键,会将socket以事件的方式传递给poller
                    if (!setsocketoptions(socket)) {
                        closesocket(socket);
                    }
                } else {
                    closesocket(socket);
                }
            } catch (throwable t) {
                exceptionutils.handlethrowable(t);
                log.error(sm.getstring("endpoint.accept.fail"), t);
            }
        }
        state = acceptorstate.ended;
    }
}

从以上代码可以看到:

  • countuporawaitconnection函数检查当前最大连接数,若未达到maxconnections则加一,否则等待;
  • socket = serversock.accept()这一行中的serversock正是nioendpoint的bind函数中打开的serversocketchannel。为了引用这个变量,nioendpoint的acceptor类是成员而不再是静态类;
  • setsocketoptions函数调用上的注释表明该函数将已连接套接字交给poller线程处理。

setsocketoptions方法接着处理已连接套接字:

protected boolean setsocketoptions(socketchannel socket) {
    // process the connection
    try {
        //disable blocking, apr style, we are gonna be polling it
        socket.configureblocking(false);
        socket sock = socket.socket();
        socketproperties.setproperties(sock);

        niochannel channel = niochannels.pop();
        if (channel == null) {
            socketbufferhandler bufhandler = new socketbufferhandler(
                    socketproperties.getappreadbufsize(),
                    socketproperties.getappwritebufsize(),
                    socketproperties.getdirectbuffer());
            if (issslenabled()) {
                channel = new secureniochannel(socket, bufhandler, selectorpool, this);
            } else {
                channel = new niochannel(socket, bufhandler);
            }
        } else {
            channel.setiochannel(socket);
            channel.reset();
        }
        // 将channel注册到poller,注意关键的两个方法,`getpoller0()`和`poller.register()`
        getpoller0().register(channel);
    } catch (throwable t) {
        exceptionutils.handlethrowable(t);
        try {
            log.error("",t);
        } catch (throwable tt) {
            exceptionutils.handlethrowable(tt);
        }
        // tell to close the socket
        return false;
    }
    return true;
}
  • 从niochannel栈中出栈一个,若能重用(即不为null)则重用对象,否则新建一个niochannel对象;
  • getpoller0方法利用轮转法选择一个poller线程,利用poller类的register方法将上述niochannel对象注册到该poller线程上;
  • 若成功转给poller线程该函数返回true,否则返回false。返回false后,acceptor类的closesocket函数会关闭通道和底层socket连接并将当前最大连接数减一。

poller

poller线程主要用于以较少的资源轮询已连接套接字以保持连接,当数据可用时转给工作线程。

poller线程数由nioendpoint的pollerthreadcount成员变量控制,默认值为2与可用处理器数二者之间的较小值。
poller实现了runnable接口,可以看到构造函数为每个poller打开了一个新的selector。

public class poller implements runnable {
    private selector selector;
    private final synchronizedqueue<pollerevent> events =
            new synchronizedqueue<>();
    // 省略一些代码
    public poller() throws ioexception {
        this.selector = selector.open();
    }

    public selector getselector() { return selector;}
    // 省略一些代码
}

将channel注册到poller,注意关键的两个方法,getpoller0()poller.register()。先来分析一下getpoller0(),该方法比较关键的一个地方就是以取模的方式对poller数量进行轮询获取。

/**
 * the socket poller.
 */
private poller[] pollers = null;
private atomicinteger pollerrotater = new atomicinteger(0);
/**
 * return an available poller in true round robin fashion.
 *
 * @return the next poller in sequence
 */
public poller getpoller0() {
    int idx = math.abs(pollerrotater.incrementandget()) % pollers.length;
    return pollers[idx];
}

接下来我们分析一下poller.register()方法。因为poller维持了一个events同步队列,所以acceptor接受到的channel会放在这个队列里面,放置的代码为events.offer(event);

public class poller implements runnable {

    private final synchronizedqueue<pollerevent> events = new synchronizedqueue<>();

    /**
     * registers a newly created socket with the poller.
     *
     * @param socket    the newly created socket
     */
    public void register(final niochannel socket) {
        socket.setpoller(this);
        niosocketwrapper ka = new niosocketwrapper(socket, nioendpoint.this);
        socket.setsocketwrapper(ka);
        ka.setpoller(this);
        ka.setreadtimeout(getsocketproperties().getsotimeout());
        ka.setwritetimeout(getsocketproperties().getsotimeout());
        ka.setkeepaliveleft(nioendpoint.this.getmaxkeepaliverequests());
        ka.setsecure(issslenabled());
        ka.setreadtimeout(getconnectiontimeout());
        ka.setwritetimeout(getconnectiontimeout());
        pollerevent r = eventcache.pop();
        ka.interestops(selectionkey.op_read);//this is what op_register turns into.
        if ( r==null) r = new pollerevent(socket,ka,op_register);
        else r.reset(socket,ka,op_register);
        addevent(r);
    }

    private void addevent(pollerevent event) {
        events.offer(event);
        if ( wakeupcounter.incrementandget() == 0 ) selector.wakeup();
    }
}

pollerevent

接下来看一下pollerevent,pollerevent实现了runnable接口,用来表示一个轮询事件,代码如下:

public static class pollerevent implements runnable {
    private niochannel socket;
    private int interestops;
    private niosocketwrapper socketwrapper;

    public pollerevent(niochannel ch, niosocketwrapper w, int intops) {
        reset(ch, w, intops);
    }

    public void reset(niochannel ch, niosocketwrapper w, int intops) {
        socket = ch;
        interestops = intops;
        socketwrapper = w;
    }

    public void reset() {
        reset(null, null, 0);
    }

    @override
    public void run() {
        if (interestops == op_register) {
            try {
                socket.getiochannel().register(
                        socket.getpoller().getselector(), selectionkey.op_read, socketwrapper);
            } catch (exception x) {
                log.error(sm.getstring("endpoint.nio.registerfail"), x);
            }
        } else {
            final selectionkey key = socket.getiochannel().keyfor(socket.getpoller().getselector());
            try {
                if (key == null) {
                    socket.socketwrapper.getendpoint().countdownconnection();
                    ((niosocketwrapper) socket.socketwrapper).closed = true;
                } else {
                    final niosocketwrapper socketwrapper = (niosocketwrapper) key.attachment();
                    if (socketwrapper != null) {
                        //we are registering the key to start with, reset the fairness counter.
                        int ops = key.interestops() | interestops;
                        socketwrapper.interestops(ops);
                        key.interestops(ops);
                    } else {
                        socket.getpoller().cancelledkey(key);
                    }
                }
            } catch (cancelledkeyexception ckx) {
                try {
                    socket.getpoller().cancelledkey(key);
                } catch (exception ignore) {}
            }
        }
    }

}

在run函数中:

  • 若感兴趣集是自定义的op_register,则说明该事件表示的已连接套接字通道尚未被轮询线程处理过,那么将该通道注册到poller线程的selector上,感兴趣集是op_read,通道注册的附件是一个niosocketwrapper对象。从poller的register方法添加事件即是这样的过程;
  • 否则获得已连接套接字通道注册到poller线程的selector上的selectionkey,为key添加新的感兴趣集。

重访poller

上文提到poller类实现了runnable接口,其重写的run方法如下所示。

public boolean events() {
    boolean result = false;
    pollerevent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
        result = true;
        try {
            //直接调用run方法
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventcache.push(pe);
            }
        } catch ( throwable x ) {
            log.error("",x);
        }
    }
    return result;
}

@override
public void run() {
    // loop until destroy() is called
    while (true) {
        boolean hasevents = false;

        try {
            if (!close) {
                /执行pollerevent的run方法
                hasevents = events();
                if (wakeupcounter.getandset(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keycount = selector.selectnow();
                } else {
                    keycount = selector.select(selectortimeout);
                }
                wakeupcounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (ioexception ioe) {
                    log.error(sm.getstring("endpoint.nio.selectorclosefail"), ioe);
                }
                break;
            }
        } catch (throwable x) {
            exceptionutils.handlethrowable(x);
            log.error("",x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keycount == 0 ) hasevents = (hasevents | events());

        // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
        iterator<selectionkey> iterator =
            keycount > 0 ? selector.selectedkeys().iterator() : null;
        // walk through the collection of ready keys and dispatch
        // any active event.
        // 对已经准备好的key进行处理
        while (iterator != null && iterator.hasnext()) {
            selectionkey sk = iterator.next();
            niosocketwrapper attachment = (niosocketwrapper)sk.attachment();
            // attachment may be null if another thread has called
            // cancelledkey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                // 真正处理key的地方
                processkey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keycount,hasevents);
    }//while

    getstoplatch().countdown();
}
  • 若队列里有元素则会先把队列里的事件均执行一遍,pollerevent的run方法会将通道注册到poller的selector上;
  • 对select返回的selectionkey进行处理,由于在pollerevent中注册通道时带上了niosocketwrapper附件,因此这里可以用selectionkey的attachment方法得到,接着调用processkey去处理已连接套接字通道。

我们接着分析processkey(),该方法又会根据key的类型,来分别处理读和写。

  1. 处理读事件,比如生成request对象
  2. 处理写事件,比如将生成的response对象通过socket写回客户端
protected void processkey(selectionkey sk, niosocketwrapper attachment) {
    try {
        if ( close ) {
            cancelledkey(sk);
        } else if ( sk.isvalid() && attachment != null ) {
            if (sk.isreadable() || sk.iswritable() ) {
                if ( attachment.getsendfiledata() != null ) {
                    processsendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyops());
                    boolean closesocket = false;
                    // 1. 处理读事件,比如生成request对象
                    // read goes before write
                    if (sk.isreadable()) {
                        if (!processsocket(attachment, socketevent.open_read, true)) {
                            closesocket = true;
                        }
                    }
                    // 2. 处理写事件,比如将生成的response对象通过socket写回客户端
                    if (!closesocket && sk.iswritable()) {
                        if (!processsocket(attachment, socketevent.open_write, true)) {
                            closesocket = true;
                        }
                    }
                    if (closesocket) {
                        cancelledkey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledkey(sk);
        }
    } catch ( cancelledkeyexception ckx ) {
        cancelledkey(sk);
    } catch (throwable t) {
        exceptionutils.handlethrowable(t);
        log.error("",t);
    }
}

我们继续来分析方法processsocket()

  1. processorcache里面拿一个processor来处理socket,processor的实现为socketprocessor
  2. processor放到工作线程池中执行
public boolean processsocket(socketwrapperbase<s> socketwrapper,
        socketevent event, boolean dispatch) {
    try {
        if (socketwrapper == null) {
            return false;
        }
        // 1. 从`processorcache`里面拿一个`processor`来处理socket,`processor`的实现为`socketprocessor`
        socketprocessorbase<s> sc = processorcache.pop();
        if (sc == null) {
            sc = createsocketprocessor(socketwrapper, event);
        } else {
            sc.reset(socketwrapper, event);
        }
        // 2. 将`processor`放到工作线程池中执行
        executor executor = getexecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (rejectedexecutionexception ree) {
        getlog().warn(sm.getstring("endpoint.executor.fail", socketwrapper) , ree);
        return false;
    } catch (throwable t) {
        exceptionutils.handlethrowable(t);
        // this means we got an oom or similar creating a thread, or that
        // the pool and its queue are full
        getlog().error(sm.getstring("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

dispatch参数表示是否要在另外的线程中处理,上文processkey各处传递的参数都是true。

  • dispatch为true且工作线程池存在时会执行executor.execute(sc),之后是由工作线程池处理已连接套接字;
  • 否则继续由poller线程自己处理已连接套接字。

abstractendpoint类的createsocketprocessor是抽象方法,nioendpoint类实现了它:

@override
protected socketprocessorbase<niochannel> createsocketprocessor(
        socketwrapperbase<niochannel> socketwrapper, socketevent event) {
    return new socketprocessor(socketwrapper, event);
}

接着我们分析socketprocessor.dorun()方法(socketprocessor.run()方法最终调用此方法)。该方法将处理逻辑交给handler处理,当event为null时,则表明是一个open_read事件。

该类的注释说明socketprocessor与worker的作用等价。

/**
 * this class is the equivalent of the worker, but will simply use in an
 * external executor thread pool.
 */
protected class socketprocessor extends socketprocessorbase<niochannel> {

    public socketprocessor(socketwrapperbase<niochannel> socketwrapper, socketevent event) {
        super(socketwrapper, event);
    }

    @override
    protected void dorun() {
        niochannel socket = socketwrapper.getsocket();
        selectionkey key = socket.getiochannel().keyfor(socket.getpoller().getselector());

        try {
            int handshake = -1;

            try {
                if (key != null) {
                    if (socket.ishandshakecomplete()) {
                        // no tls handshaking required. let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == socketevent.stop || event == socketevent.disconnect ||
                            event == socketevent.error) {
                        // unable to complete the tls handshake. treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socket.handshake(key.isreadable(), key.iswritable());
                        // the handshake process reads/writes from/to the
                        // socket. status may therefore be open_write once
                        // the handshake completes. however, the handshake
                        // happens when the socket is opened so the status
                        // must always be open_read after it completes. it
                        // is ok to always set this as it is only used if
                        // the handshake completes.
                        event = socketevent.open_read;
                    }
                }
            } catch (ioexception x) {
                handshake = -1;
                if (log.isdebugenabled()) log.debug("error during ssl handshake",x);
            } catch (cancelledkeyexception ckx) {
                handshake = -1;
            }
            if (handshake == 0) {
                socketstate state = socketstate.open;
                // process the request from this socket
                // 将处理逻辑交给`handler`处理,当event为null时,则表明是一个`open_read`事件
                if (event == null) {
                    state = gethandler().process(socketwrapper, socketevent.open_read);
                } else {
                    state = gethandler().process(socketwrapper, event);
                }
                if (state == socketstate.closed) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == selectionkey.op_read){
                socketwrapper.registerreadinterest();
            } else if (handshake == selectionkey.op_write){
                socketwrapper.registerwriteinterest();
            }
        } catch (cancelledkeyexception cx) {
            socket.getpoller().cancelledkey(key);
        } catch (virtualmachineerror vme) {
            exceptionutils.handlethrowable(vme);
        } catch (throwable t) {
            log.error("", t);
            socket.getpoller().cancelledkey(key);
        } finally {
            socketwrapper = null;
            event = null;
            //return to cache
            if (running && !paused) {
                processorcache.push(this);
            }
        }
    }
}

handler的关键方法是process(),虽然这个方法有很多条件分支,但是逻辑却非常清楚,主要是调用processor.process()方法。

@override
public socketstate process(socketwrapperbase<s> wrapper, socketevent status) {
    try {
     
        if (processor == null) {
            processor = getprotocol().createprocessor();
            register(processor);
        }

        processor.setsslsupport(
                wrapper.getsslsupport(getprotocol().getclientcertprovider()));

        // associate the processor with the connection
        connections.put(socket, processor);

        socketstate state = socketstate.closed;
        do {
            // 关键的代码,终于找到你了
            state = processor.process(wrapper, status);

        } while ( state == socketstate.upgrading);
        return state;
    } 
    catch (throwable e) {
        exceptionutils.handlethrowable(e);
        // any other exception or error is odd. here we log it
        // with "error" level, so it will show up even on
        // less-than-verbose logs.
        getlog().error(sm.getstring("abstractconnectionhandler.error"), e);
    } finally {
        containerthreadmarker.clear();
    }

    // make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return socketstate.closed;
}

processor

createprocessor 

protected http11processor createprocessor() {                          
    // 构建 http11processor
    http11processor processor = new http11processor(
            proto.getmaxhttpheadersize(), (jioendpoint)proto.endpoint, // 1. http header 的最大尺寸
            proto.getmaxtrailersize(),proto.getmaxextensionsize());
    processor.setadapter(proto.getadapter());
    // 2. 默认的 keepalive 情况下, 每个 socket 处理的最多的 请求次数
    processor.setmaxkeepaliverequests(proto.getmaxkeepaliverequests());
    // 3. 开启 keepalive 的 timeout
    processor.setkeepalivetimeout(proto.getkeepalivetimeout());      
    // 4. http 当遇到文件上传时的 默认超时时间 (300 * 1000)    
    processor.setconnectionuploadtimeout(
            proto.getconnectionuploadtimeout());                      
    processor.setdisableuploadtimeout(proto.getdisableuploadtimeout());
    // 5. 当 http 请求的 body size超过这个值时, 通过 gzip 进行压缩
    processor.setcompressionminsize(proto.getcompressionminsize());  
    // 6. http 请求是否开启 compression 处理    
    processor.setcompression(proto.getcompression());                  
    processor.setnocompressionuseragents(proto.getnocompressionuseragents());
    // 7. http body里面的内容是 "text/html,text/xml,text/plain" 才会进行 压缩处理
    processor.setcompressablemimetypes(proto.getcompressablemimetypes());
    processor.setrestricteduseragents(proto.getrestricteduseragents());
    // 8. socket 的 buffer, 默认 9000
    processor.setsocketbuffer(proto.getsocketbuffer());       
    // 9. 最大的 post 处理尺寸的大小 4 * 1000    
    processor.setmaxsavepostsize(proto.getmaxsavepostsize());          
    processor.setserver(proto.getserver());
    processor.setdisablekeepalivepercentage(
            proto.getdisablekeepalivepercentage());                    
    register(processor);                                               
    return processor;
}

这儿我们主要关注的是processor对于读的操作,也只有一行代码。调用service()方法。

public abstract class abstractprocessorlight implements processor {

    @override
    public socketstate process(socketwrapperbase<?> socketwrapper, socketevent status)
            throws ioexception {

        socketstate state = socketstate.closed;
        iterator<dispatchtype> dispatches = null;
        do {
            if (dispatches != null) {
                dispatchtype nextdispatch = dispatches.next();
                state = dispatch(nextdispatch.getsocketstatus());
            } else if (status == socketevent.disconnect) {
                // do nothing here, just wait for it to get recycled
            } else if (isasync() || isupgrade() || state == socketstate.async_end) {
                state = dispatch(status);
                if (state == socketstate.open) {
                    // there may be pipe-lined data to read. if the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. to avoid this,
                    // process it now.
                    state = service(socketwrapper);
                }
            } else if (status == socketevent.open_write) {
                // extra write event likely after async, ignore
                state = socketstate.long;
            } else if (status == socketevent.open_read){
                // 调用`service()`方法
                state = service(socketwrapper);
            } else {
                // default to closing the socket if the socketevent passed in
                // is not consistent with the current state of the processor
                state = socketstate.closed;
            }

            if (getlog().isdebugenabled()) {
                getlog().debug("socket: [" + socketwrapper +
                        "], status in: [" + status +
                        "], state out: [" + state + "]");
            }

            if (state != socketstate.closed && isasync()) {
                state = asyncpostprocess();
                if (getlog().isdebugenabled()) {
                    getlog().debug("socket: [" + socketwrapper +
                            "], state after async post processing: [" + state + "]");
                }
            }

            if (dispatches == null || !dispatches.hasnext()) {
                // only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getiteratorandcleardispatches();
            }
        } while (state == socketstate.async_end ||
                dispatches != null && state != socketstate.closed);

        return state;
    }
}

processor.service()方法比较重要的地方就两点。该方法非常得长,也超过了200行,在此我们不再拷贝此方法的代码。

  1. 生成request和response对象
  2. 调用adapter.service()方法,将生成的request和response对象传进去

adapter

adapter用于连接connectorcontainer,起到承上启下的作用。processor会调用adapter.service()方法。我们来分析一下,主要做了下面几件事情:

  1. 根据coyote框架的request和response对象,生成connector的request和response对象(是httpservletrequest和httpservletresponse的封装)
  2. 补充header
  3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
  4. 真正进入容器的地方,调用engine容器下pipeline的阀门
  5. 通过request.finishrequest 与 response.finishresponse(刷outputbuffer中的数据到浏览器) 来完成整个请求
@override
public void service(org.apache.coyote.request req, org.apache.coyote.response res)
        throws exception {

    // 1. 根据coyote框架的request和response对象,生成connector的request和response对象(是httpservletrequest和httpservletresponse的封装)
    request request = (request) req.getnote(adapter_notes);
    response response = (response) res.getnote(adapter_notes);

    if (request == null) {
        // create objects
        request = connector.createrequest();
        request.setcoyoterequest(req);
        response = connector.createresponse();
        response.setcoyoteresponse(res);

        // link objects
        request.setresponse(response);
        response.setrequest(request);

        // set as notes
        req.setnote(adapter_notes, request);
        res.setnote(adapter_notes, response);

        // set query string encoding
        req.getparameters().setquerystringcharset(connector.geturicharset());
    }

    // 2. 补充header
    if (connector.getxpoweredby()) {
        response.addheader("x-powered-by", powered_by);
    }

    boolean async = false;
    boolean postparsesuccess = false;

    req.getrequestprocessor().setworkerthreadname(thread_name.get());

    try {
        // parse and set catalina and configuration specific
        // request parameters
        // 3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
        // 用来处理请求映射 (获取 host, context, wrapper, uri 后面的参数的解析, sessionid )
        postparsesuccess = postparserequest(req, request, res, response);
        if (postparsesuccess) {
            //check valves if we support async
            request.setasyncsupported(
                    connector.getservice().getcontainer().getpipeline().isasyncsupported());
            // calling the container
            // 4. 真正进入容器的地方,调用engine容器下pipeline的阀门
            connector.getservice().getcontainer().getpipeline().getfirst().invoke(
                    request, response);
        }
        if (request.isasync()) {
            async = true;
            readlistener readlistener = req.getreadlistener();
            if (readlistener != null && request.isfinished()) {
                // possible the all data may have been read during service()
                // method so this needs to be checked here
                classloader oldcl = null;
                try {
                    oldcl = request.getcontext().bind(false, null);
                    if (req.sendalldatareadevent()) {
                        req.getreadlistener().onalldataread();
                    }
                } finally {
                    request.getcontext().unbind(false, oldcl);
                }
            }

            throwable throwable =
                    (throwable) request.getattribute(requestdispatcher.error_exception);

            // if an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isasynccompleting() && throwable != null) {
                request.getasynccontextinternal().seterrorstate(throwable, true);
            }
        } else {
            //5. 通过request.finishrequest 与 response.finishresponse(刷outputbuffer中的数据到浏览器) 来完成整个请求
            request.finishrequest();
            //将 org.apache.catalina.connector.response对应的 outputbuffer 中的数据 刷到 org.apache.coyote.response 对应的 internaloutputbuffer 中, 并且最终调用 socket对应的 outputstream 将数据刷出去( 这里会组装 http response 中的 header 与 body 里面的数据, 并且刷到远端 )
            response.finishresponse();
        }

    } catch (ioexception e) {
        // ignore
    } finally {
        atomicboolean error = new atomicboolean(false);
        res.action(actioncode.is_error, error);

        if (request.isasynccompleting() && error.get()) {
            // connection will be forcibly closed which will prevent
            // completion happening at the usual point. need to trigger
            // call to oncomplete() here.
            res.action(actioncode.async_post_process,  null);
            async = false;
        }

        // access log
        if (!async && postparsesuccess) {
            // log only if processing was invoked.
            // if postparserequest() failed, it has already logged it.
            context context = request.getcontext();
            // if the context is null, it is likely that the endpoint was
            // shutdown, this connection closed and the request recycled in
            // a different thread. that thread will have updated the access
            // log so it is ok not to update the access log here in that
            // case.
            if (context != null) {
                context.logaccess(request, response,
                        system.currenttimemillis() - req.getstarttime(), false);
            }
        }

        req.getrequestprocessor().setworkerthreadname(null);

        // recycle the wrapper request and response
        if (!async) {
            request.recycle();
            response.recycle();
        }
    }
}

请求预处理

postparserequest方法对请求做预处理,如对路径去除分号表示的路径参数、进行uri解码、规格化(点号和两点号)

 

protected boolean postparserequest(org.apache.coyote.request req, request request,
        org.apache.coyote.response res, response response) throws ioexception, servletexception {
    // 省略部分代码
    messagebytes decodeduri = req.decodeduri();

    if (undecodeduri.gettype() == messagebytes.t_bytes) {
        // copy the raw uri to the decodeduri
        decodeduri.duplicate(undecodeduri);

        // parse the path parameters. this will:
        //   - strip out the path parameters
        //   - convert the decodeduri to bytes
        parsepathparameters(req, request);

        // uri decoding
        // %xx decoding of the url
        try {
            req.geturldecoder().convert(decodeduri, false);
        } catch (ioexception ioe) {
            res.setstatus(400);
            res.setmessage("invalid uri: " + ioe.getmessage());
            connector.getservice().getcontainer().logaccess(
                    request, response, 0, true);
            return false;
        }
        // normalization
        if (!normalize(req.decodeduri())) {
            res.setstatus(400);
            res.setmessage("invalid uri");
            connector.getservice().getcontainer().logaccess(
                    request, response, 0, true);
            return false;
        }
        // character decoding
        converturi(decodeduri, request);
        // check that the uri is still normalized
        if (!checknormalize(req.decodeduri())) {
            res.setstatus(400);
            res.setmessage("invalid uri character encoding");
            connector.getservice().getcontainer().logaccess(
                    request, response, 0, true);
            return false;
        }
    } else {
        /* the uri is chars or string, and has been sent using an in-memory
            * protocol handler. the following assumptions are made:
            * - req.requesturi() has been set to the 'original' non-decoded,
            *   non-normalized uri
            * - req.decodeduri() has been set to the decoded, normalized form
            *   of req.requesturi()
            */
        decodeduri.tochars();
        // remove all path parameters; any needed path parameter should be set
        // using the request object rather than passing it in the url
        charchunk uricc = decodeduri.getcharchunk();
        int semicolon = uricc.indexof(';');
        if (semicolon > 0) {
            decodeduri.setchars
                (uricc.getbuffer(), uricc.getstart(), semicolon);
        }
    }

    // request mapping.
    messagebytes servername;
    if (connector.getuseipvhosts()) {
        servername = req.localname();
        if (servername.isnull()) {
            // well, they did ask for it
            res.action(actioncode.req_local_name_attribute, null);
        }
    } else {
        servername = req.servername();
    }

    // version for the second mapping loop and
    // context that we expect to get for that version
    string version = null;
    context versioncontext = null;
    boolean maprequired = true;

    while (maprequired) {
        // this will map the the latest version by default
        connector.getservice().getmapper().map(servername, decodeduri,
                version, request.getmappingdata());
        // 省略部分代码
    }
    // 省略部分代码
}

以messagebytes的类型是t_bytes为例:

  • parsepathparameters方法去除uri中分号表示的路径参数;
  • req.geturldecoder()得到一个udecoder实例,它的convert方法对uri解码,这里的解码只是移除百分号,计算百分号后两位的十六进制数字值以替代原来的三位百分号编码;
  • normalize方法规格化uri,解释路径中的“.”和“..”;
  • converturi方法利用connector的uriencoding属性将uri的字节转换为字符表示;
  • 注意connector.getservice().getmapper().map(servername, decodeduri, version, request.getmappingdata()) 这行,之前service启动时mapperlistener注册了该service内的各host和context。根据uri选择context时,mapper的map方法采用的是converturi方法解码后的uri与每个context的路径去比较

容器处理

如果请求可以被传给容器的pipeline即当postparserequest方法返回true时,则由容器继续处理,在service方法中有connector.getservice().getcontainer().getpipeline().getfirst().invoke(request, response)这一行:

  • connector调用getservice返回standardservice;
  • standardservice调用getcontainer返回standardengine;
  • standardengine调用getpipeline返回与其关联的standardpipeline;

 后续处理流程请看下一篇文章