Tomcat源码分析 (八)----- HTTP请求处理过程(一)
终于进行到connector
的分析阶段了,这也是tomcat里面最复杂的一块功能了。connector
中文名为连接器
,既然是连接器,它肯定会连接某些东西,连接些什么呢?
connector
用于接受请求并将请求封装成request和response,然后交给container
进行处理,container
处理完之后再交给connector
返回给客户端。
要理解connector
,我们需要问自己4个问题。
- (1)
connector
如何接受请求的? - (2)如何将请求封装成request和response的?
- (3)封装完之后的request和response如何交给
container
进行处理的? - (4)
container
处理完之后如何交给connector
并返回给客户端的?
先来一张connector
的整体结构图
【注意】:不同的协议、不同的通信方式,protocolhandler
会有不同的实现。在tomcat8.5中,protocolhandler
的类继承层级如下图所示。
针对上述的类继承层级图,我们做如下说明:
- ajp和http11是两种不同的协议
- nio、nio2和apr是不同的通信方式
- 协议和通信方式可以相互组合。
protocolhandler
包含三个部件:endpoint
、processor
、adapter
。
-
endpoint
用来处理底层socket的网络连接,processor
用于将endpoint
接收到的socket封装成request,adapter
用于将request交给container进行具体的处理。 -
endpoint
由于是处理底层的socket网络连接,因此endpoint
是用来实现tcp/ip协议
的,而processor
用来实现http协议
的,adapter
将请求适配到servlet容器进行具体的处理。 -
endpoint
的抽象实现类abstractendpoint里面定义了acceptor
和asynctimeout
两个内部类和一个handler接口
。acceptor
用于监听请求,asynctimeout
用于检查异步request的超时,handler
用于处理接收到的socket,在内部调用processor
进行处理。
至此,我们已经明白了问题(1)、(2)和(3)。至于(4),当我们了解了container自然就明白了,前面章节内容已经详细分析过了。
connector源码分析入口
我们在service
标准实现standardservice
的源码中发现,其init()
、start()
、stop()
和destroy()
方法分别会对connectors的同名方法进行调用。而一个service
对应着多个connector
。
service.init()
@override protected void initinternal() throws lifecycleexception { super.initinternal(); if (engine != null) { engine.init(); } // initialize any executors for (executor executor : findexecutors()) { if (executor instanceof jmxenabled) { ((jmxenabled) executor).setdomain(getdomain()); } executor.init(); } // initialize mapper listener mapperlistener.init(); // initialize our defined connectors synchronized (connectorslock) { for (connector connector : connectors) { try { connector.init(); } catch (exception e) { string message = sm.getstring( "standardservice.connector.initfailed", connector); log.error(message, e); if (boolean.getboolean("org.apache.catalina.startup.exit_on_init_failure")) throw new lifecycleexception(message); } } } }
service.start()
@override protected void startinternal() throws lifecycleexception { if(log.isinfoenabled()) log.info(sm.getstring("standardservice.start.name", this.name)); setstate(lifecyclestate.starting); // start our defined container first if (engine != null) { synchronized (engine) { engine.start(); } } synchronized (executors) { for (executor executor: executors) { executor.start(); } } mapperlistener.start(); // start our defined connectors second synchronized (connectorslock) { for (connector connector: connectors) { try { // if it has already failed, don't try and start it if (connector.getstate() != lifecyclestate.failed) { connector.start(); } } catch (exception e) { log.error(sm.getstring( "standardservice.connector.startfailed", connector), e); } } } }
我们知道connector
实现了lifecycle
接口,所以它是一个生命周期组件
。所以connector
的启动逻辑入口在于init()
和start()
。
connector构造方法
在分析之前,我们看看server.xml
,该文件已经体现出了tomcat中各个组件的大体结构。
<?xml version='1.0' encoding='utf-8'?> <server port="8005" shutdown="shutdown"> <listener classname="org.apache.catalina.startup.versionloggerlistener" /> <listener classname="org.apache.catalina.core.aprlifecyclelistener" sslengine="on" /> <listener classname="org.apache.catalina.core.jrememoryleakpreventionlistener" /> <listener classname="org.apache.catalina.mbeans.globalresourceslifecyclelistener" /> <listener classname="org.apache.catalina.core.threadlocalleakpreventionlistener" /> <globalnamingresources> <resource name="userdatabase" auth="container" type="org.apache.catalina.userdatabase" description="user database that can be updated and saved" factory="org.apache.catalina.users.memoryuserdatabasefactory" pathname="conf/tomcat-users.xml" /> </globalnamingresources> <service name="catalina"> <connector port="8080" protocol="http/1.1" connectiontimeout="20000" redirectport="8443" /> <connector port="8009" protocol="ajp/1.3" redirectport="8443" /> <engine name="catalina" defaulthost="localhost"> <realm classname="org.apache.catalina.realm.lockoutrealm"> <realm classname="org.apache.catalina.realm.userdatabaserealm" resourcename="userdatabase"/> </realm> <host name="localhost" appbase="webapps" unpackwars="true" autodeploy="true"> <valve classname="org.apache.catalina.valves.accesslogvalve" directory="logs" prefix="localhost_access_log" suffix=".txt" pattern="%h %l %u %t "%r" %s %b" /> </host> </engine> </service> </server>
在这个文件中,我们看到一个connector
有几个关键属性,port
和protocol
是其中的两个。server.xml
默认支持两种协议:http/1.1
和ajp/1.3
。其中http/1.1
用于支持http1.1协议,而ajp/1.3
用于支持对apache服务器的通信。
接下来我们看看构造方法。
public connector() { this(null); // 1. 无参构造方法,传入参数为空协议,会默认使用`http/1.1` } public connector(string protocol) { setprotocol(protocol); // instantiate protocol handler // 5. 使用protocolhandler的类名构造protocolhandler的实例 protocolhandler p = null; try { class<?> clazz = class.forname(protocolhandlerclassname); p = (protocolhandler) clazz.getconstructor().newinstance(); } catch (exception e) { log.error(sm.getstring( "coyoteconnector.protocolhandlerinstantiationfailed"), e); } finally { this.protocolhandler = p; } if (globals.strict_servlet_compliance) { uricharset = standardcharsets.iso_8859_1; } else { uricharset = standardcharsets.utf_8; } } @deprecated public void setprotocol(string protocol) { boolean aprconnector = aprlifecyclelistener.isapravailable() && aprlifecyclelistener.getuseaprconnector(); // 2. `http/1.1`或`null`,protocolhandler使用`org.apache.coyote.http11.http11nioprotocol`,不考虑apr if ("http/1.1".equals(protocol) || protocol == null) { if (aprconnector) { setprotocolhandlerclassname("org.apache.coyote.http11.http11aprprotocol"); } else { setprotocolhandlerclassname("org.apache.coyote.http11.http11nioprotocol"); } } // 3. `ajp/1.3`,protocolhandler使用`org.apache.coyote.ajp.ajpnioprotocol`,不考虑apr else if ("ajp/1.3".equals(protocol)) { if (aprconnector) { setprotocolhandlerclassname("org.apache.coyote.ajp.ajpaprprotocol"); } else { setprotocolhandlerclassname("org.apache.coyote.ajp.ajpnioprotocol"); } } // 4. 其他情况,使用传入的protocol作为protocolhandler的类名 else { setprotocolhandlerclassname(protocol); } }
从上面的代码我们看到构造方法主要做了下面几件事情:
- 无参构造方法,传入参数为空协议,会默认使用
http/1.1
-
http/1.1
或null
,protocolhandler使用org.apache.coyote.http11.http11nioprotocol
,不考虑apr -
ajp/1.3
,protocolhandler使用org.apache.coyote.ajp.ajpnioprotocol
,不考虑apr - 其他情况,使用传入的protocol作为protocolhandler的类名
- 使用protocolhandler的类名构造protocolhandler的实例
connector.init()
@override protected void initinternal() throws lifecycleexception { super.initinternal(); // initialize adapter // 1. 初始化adapter adapter = new coyoteadapter(this); protocolhandler.setadapter(adapter); // make sure parsebodymethodsset has a default // 2. 设置接受body的method列表,默认为post if (null == parsebodymethodsset) { setparsebodymethods(getparsebodymethods()); } if (protocolhandler.isaprrequired() && !aprlifecyclelistener.isapravailable()) { throw new lifecycleexception(sm.getstring("coyoteconnector.protocolhandlernoapr", getprotocolhandlerclassname())); } if (aprlifecyclelistener.isapravailable() && aprlifecyclelistener.getuseopenssl() && protocolhandler instanceof abstracthttp11jsseprotocol) { abstracthttp11jsseprotocol<?> jsseprotocolhandler = (abstracthttp11jsseprotocol<?>) protocolhandler; if (jsseprotocolhandler.issslenabled() && jsseprotocolhandler.getsslimplementationname() == null) { // openssl is compatible with the jsse configuration, so use it if apr is available jsseprotocolhandler.setsslimplementationname(opensslimplementation.class.getname()); } } // 3. 初始化protocolhandler try { protocolhandler.init(); } catch (exception e) { throw new lifecycleexception( sm.getstring("coyoteconnector.protocolhandlerinitializationfailed"), e); } }
init()
方法做了3件事情
- 初始化adapter
- 设置接受body的method列表,默认为post
- 初始化protocolhandler
从protocolhandler类继承层级
我们知道protocolhandler
的子类都必须实现abstractprotocol
抽象类,而protocolhandler.init();
的逻辑代码正是在这个抽象类里面。我们来分析一下。
@override public void init() throws exception { if (getlog().isinfoenabled()) { getlog().info(sm.getstring("abstractprotocolhandler.init", getname())); } if (oname == null) { // component not pre-registered so register it oname = createobjectname(); if (oname != null) { registry.getregistry(null, null).registercomponent(this, oname, null); } } if (this.domain != null) { rgoname = new objectname(domain + ":type=globalrequestprocessor,name=" + getname()); registry.getregistry(null, null).registercomponent( gethandler().getglobal(), rgoname, null); } // 1. 设置endpoint的名字,默认为:http-nio-{port} string endpointname = getname(); endpoint.setname(endpointname.substring(1, endpointname.length()-1)); endpoint.setdomain(domain); // 2. 初始化endpoint endpoint.init(); }
我们接着分析一下endpoint.init()
里面又做了什么。该方法位于abstactendpoint
抽象类,该类是基于模板方法模式实现的,主要调用了子类的bind()
方法。
public abstract void bind() throws exception; public abstract void unbind() throws exception; public abstract void startinternal() throws exception; public abstract void stopinternal() throws exception; public void init() throws exception { // 执行bind()方法 if (bindoninit) { bind(); bindstate = bindstate.bound_on_init; } if (this.domain != null) { // register endpoint (as threadpool - historical name) oname = new objectname(domain + ":type=threadpool,name=\"" + getname() + "\""); registry.getregistry(null, null).registercomponent(this, oname, null); objectname socketpropertiesoname = new objectname(domain + ":type=threadpool,name=\"" + getname() + "\",subtype=socketproperties"); socketproperties.setobjectname(socketpropertiesoname); registry.getregistry(null, null).registercomponent(socketproperties, socketpropertiesoname, null); for (sslhostconfig sslhostconfig : findsslhostconfigs()) { registerjmx(sslhostconfig); } } }
继续分析bind()
方法,我们终于看到了我们想要看的东西了。关键的代码在于serversock.socket().bind(addr,getacceptcount());
,用于绑定serversocket
到指定的ip和端口。
@override public void bind() throws exception { if (!getuseinheritedchannel()) { serversock = serversocketchannel.open(); socketproperties.setproperties(serversock.socket()); inetsocketaddress addr = (getaddress()!=null?new inetsocketaddress(getaddress(),getport()):new inetsocketaddress(getport())); //绑定serversocket到指定的ip和端口 serversock.socket().bind(addr,getacceptcount()); } else { // retrieve the channel provided by the os channel ic = system.inheritedchannel(); if (ic instanceof serversocketchannel) { serversock = (serversocketchannel) ic; } if (serversock == null) { throw new illegalargumentexception(sm.getstring("endpoint.init.bind.inherited")); } } serversock.configureblocking(true); //mimic apr behavior // initialize thread count defaults for acceptor, poller if (acceptorthreadcount == 0) { // fixme: doesn't seem to work that well with multiple accept threads acceptorthreadcount = 1; } if (pollerthreadcount <= 0) { //minimum one poller thread pollerthreadcount = 1; } setstoplatch(new countdownlatch(pollerthreadcount)); // initialize ssl if needed initialisessl(); selectorpool.open(); }
好了,我们已经分析完了init()
方法,接下来我们分析start()
方法。关键代码就一行,调用protocolhandler.start()
方法。
connector.start()
@override protected void startinternal() throws lifecycleexception { // validate settings before starting if (getport() < 0) { throw new lifecycleexception(sm.getstring( "coyoteconnector.invalidport", integer.valueof(getport()))); } setstate(lifecyclestate.starting); try { protocolhandler.start(); } catch (exception e) { throw new lifecycleexception( sm.getstring("coyoteconnector.protocolhandlerstartfailed"), e); } }
我们深入protocolhandler.start()
方法。
- 调用
endpoint.start()
方法 - 开启异步超时线程,线程执行单元为
asynctimeout
@override public void start() throws exception { if (getlog().isinfoenabled()) { getlog().info(sm.getstring("abstractprotocolhandler.start", getname())); } // 1. 调用`endpoint.start()`方法 endpoint.start(); // start async timeout thread // 2. 开启异步超时线程,线程执行单元为`asynctimeout` asynctimeout = new asynctimeout(); thread timeoutthread = new thread(asynctimeout, getnameinternal() + "-asynctimeout"); int priority = endpoint.getthreadpriority(); if (priority < thread.min_priority || priority > thread.max_priority) { priority = thread.norm_priority; } timeoutthread.setpriority(priority); timeoutthread.setdaemon(true); timeoutthread.start(); }
这儿我们重点关注endpoint.start()
方法
public final void start() throws exception { // 1. `bind()`已经在`init()`中分析过了 if (bindstate == bindstate.unbound) { bind(); bindstate = bindstate.bound_on_start; } startinternal(); } @override public void startinternal() throws exception { if (!running) { running = true; paused = false; processorcache = new synchronizedstack<>(synchronizedstack.default_size, socketproperties.getprocessorcache()); eventcache = new synchronizedstack<>(synchronizedstack.default_size, socketproperties.geteventcache()); niochannels = new synchronizedstack<>(synchronizedstack.default_size, socketproperties.getbufferpool()); // create worker collection // 2. 创建工作者线程池 if ( getexecutor() == null ) { createexecutor(); } // 3. 初始化连接latch,用于限制请求的并发量 initializeconnectionlatch(); // start poller threads // 4. 开启poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是handler的代码 pollers = new poller[getpollerthreadcount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new poller(); thread pollerthread = new thread(pollers[i], getname() + "-clientpoller-"+i); pollerthread.setpriority(threadpriority); pollerthread.setdaemon(true); pollerthread.start(); } // 5. 开启acceptor线程 startacceptorthreads(); } } protected final void startacceptorthreads() { int count = getacceptorthreadcount(); acceptors = new acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createacceptor(); string threadname = getname() + "-acceptor-" + i; acceptors[i].setthreadname(threadname); thread t = new thread(acceptors[i], threadname); t.setpriority(getacceptorthreadpriority()); t.setdaemon(getdaemon()); t.start(); } }
-
bind()
已经在init()
中分析过了 - 创建工作者线程池
- 初始化连接latch,用于限制请求的并发量
- 创建轮询poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是handler的代码
- 创建acceptor线程
connector请求逻辑
分析完了connector
的启动逻辑之后,我们就需要进一步分析一下http的请求逻辑,当请求从客户端发起之后,需要经过哪些操作才能真正地得到执行?
acceptor
acceptor线程主要用于监听套接字,将已连接套接字转给poller线程。acceptor线程数由abstracendpoint的acceptorthreadcount成员变量控制,默认值为1
abstractendpoint.acceptor是abstractendpoint类的静态抽象类,实现了runnable接口,部分代码如下:public abstract static class acceptor implements runnable { public enum acceptorstate { new, running, paused, ended } protected volatile acceptorstate state = acceptorstate.new; public final acceptorstate getstate() { return state; } private string threadname; protected final void setthreadname(final string threadname) { this.threadname = threadname; } protected final string getthreadname() { return threadname; } }
nioendpoint的acceptor成员内部类继承了abstractendpoint.acceptor:
protected class acceptor extends abstractendpoint.acceptor { @override public void run() { int errordelay = 0; // loop until we receive a shutdown command while (running) { // loop if endpoint is paused // 1. 运行过程中,如果`endpoint`暂停了,则`acceptor`进行自旋(间隔50毫秒) ` while (paused && running) { state = acceptorstate.paused; try { thread.sleep(50); } catch (interruptedexception e) { // ignore } } // 2. 如果`endpoint`终止运行了,则`acceptor`也会终止 if (!running) { break; } state = acceptorstate.running; try { //if we have reached max connections, wait // 3. 如果请求达到了最大连接数,则wait直到连接数降下来 countuporawaitconnection(); socketchannel socket = null; try { // accept the next incoming connection from the server // socket // 4. 接受下一次连接的socket socket = serversock.accept(); } catch (ioexception ioe) { // we didn't get a socket countdownconnection(); if (running) { // introduce delay if necessary errordelay = handleexceptionwithdelay(errordelay); // re-throw throw ioe; } else { break; } } // successful accept, reset the error delay errordelay = 0; // configure the socket if (running && !paused) { // setsocketoptions() will hand the socket off to // an appropriate processor if successful // 5. `setsocketoptions()`这儿是关键,会将socket以事件的方式传递给poller if (!setsocketoptions(socket)) { closesocket(socket); } } else { closesocket(socket); } } catch (throwable t) { exceptionutils.handlethrowable(t); log.error(sm.getstring("endpoint.accept.fail"), t); } } state = acceptorstate.ended; } }
从以上代码可以看到:
- countuporawaitconnection函数检查当前最大连接数,若未达到maxconnections则加一,否则等待;
- socket = serversock.accept()这一行中的serversock正是nioendpoint的bind函数中打开的serversocketchannel。为了引用这个变量,nioendpoint的acceptor类是成员而不再是静态类;
- setsocketoptions函数调用上的注释表明该函数将已连接套接字交给poller线程处理。
setsocketoptions方法接着处理已连接套接字:
protected boolean setsocketoptions(socketchannel socket) { // process the connection try { //disable blocking, apr style, we are gonna be polling it socket.configureblocking(false); socket sock = socket.socket(); socketproperties.setproperties(sock); niochannel channel = niochannels.pop(); if (channel == null) { socketbufferhandler bufhandler = new socketbufferhandler( socketproperties.getappreadbufsize(), socketproperties.getappwritebufsize(), socketproperties.getdirectbuffer()); if (issslenabled()) { channel = new secureniochannel(socket, bufhandler, selectorpool, this); } else { channel = new niochannel(socket, bufhandler); } } else { channel.setiochannel(socket); channel.reset(); } // 将channel注册到poller,注意关键的两个方法,`getpoller0()`和`poller.register()` getpoller0().register(channel); } catch (throwable t) { exceptionutils.handlethrowable(t); try { log.error("",t); } catch (throwable tt) { exceptionutils.handlethrowable(tt); } // tell to close the socket return false; } return true; }
- 从niochannel栈中出栈一个,若能重用(即不为null)则重用对象,否则新建一个niochannel对象;
- getpoller0方法利用轮转法选择一个poller线程,利用poller类的register方法将上述niochannel对象注册到该poller线程上;
- 若成功转给poller线程该函数返回true,否则返回false。返回false后,acceptor类的closesocket函数会关闭通道和底层socket连接并将当前最大连接数减一。
poller
poller线程主要用于以较少的资源轮询已连接套接字以保持连接,当数据可用时转给工作线程。
poller线程数由nioendpoint的pollerthreadcount成员变量控制,默认值为2与可用处理器数二者之间的较小值。
poller实现了runnable接口,可以看到构造函数为每个poller打开了一个新的selector。
public class poller implements runnable { private selector selector; private final synchronizedqueue<pollerevent> events = new synchronizedqueue<>(); // 省略一些代码 public poller() throws ioexception { this.selector = selector.open(); } public selector getselector() { return selector;} // 省略一些代码 }
将channel注册到poller,注意关键的两个方法,getpoller0()
和poller.register()
。先来分析一下getpoller0()
,该方法比较关键的一个地方就是以取模的方式
对poller数量进行轮询获取。
/** * the socket poller. */ private poller[] pollers = null; private atomicinteger pollerrotater = new atomicinteger(0); /** * return an available poller in true round robin fashion. * * @return the next poller in sequence */ public poller getpoller0() { int idx = math.abs(pollerrotater.incrementandget()) % pollers.length; return pollers[idx]; }
接下来我们分析一下poller.register()
方法。因为poller
维持了一个events同步队列
,所以acceptor
接受到的channel会放在这个队列里面,放置的代码为events.offer(event);
public class poller implements runnable { private final synchronizedqueue<pollerevent> events = new synchronizedqueue<>(); /** * registers a newly created socket with the poller. * * @param socket the newly created socket */ public void register(final niochannel socket) { socket.setpoller(this); niosocketwrapper ka = new niosocketwrapper(socket, nioendpoint.this); socket.setsocketwrapper(ka); ka.setpoller(this); ka.setreadtimeout(getsocketproperties().getsotimeout()); ka.setwritetimeout(getsocketproperties().getsotimeout()); ka.setkeepaliveleft(nioendpoint.this.getmaxkeepaliverequests()); ka.setsecure(issslenabled()); ka.setreadtimeout(getconnectiontimeout()); ka.setwritetimeout(getconnectiontimeout()); pollerevent r = eventcache.pop(); ka.interestops(selectionkey.op_read);//this is what op_register turns into. if ( r==null) r = new pollerevent(socket,ka,op_register); else r.reset(socket,ka,op_register); addevent(r); } private void addevent(pollerevent event) { events.offer(event); if ( wakeupcounter.incrementandget() == 0 ) selector.wakeup(); } }
pollerevent
接下来看一下pollerevent,pollerevent实现了runnable接口,用来表示一个轮询事件,代码如下:
public static class pollerevent implements runnable { private niochannel socket; private int interestops; private niosocketwrapper socketwrapper; public pollerevent(niochannel ch, niosocketwrapper w, int intops) { reset(ch, w, intops); } public void reset(niochannel ch, niosocketwrapper w, int intops) { socket = ch; interestops = intops; socketwrapper = w; } public void reset() { reset(null, null, 0); } @override public void run() { if (interestops == op_register) { try { socket.getiochannel().register( socket.getpoller().getselector(), selectionkey.op_read, socketwrapper); } catch (exception x) { log.error(sm.getstring("endpoint.nio.registerfail"), x); } } else { final selectionkey key = socket.getiochannel().keyfor(socket.getpoller().getselector()); try { if (key == null) { socket.socketwrapper.getendpoint().countdownconnection(); ((niosocketwrapper) socket.socketwrapper).closed = true; } else { final niosocketwrapper socketwrapper = (niosocketwrapper) key.attachment(); if (socketwrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestops() | interestops; socketwrapper.interestops(ops); key.interestops(ops); } else { socket.getpoller().cancelledkey(key); } } } catch (cancelledkeyexception ckx) { try { socket.getpoller().cancelledkey(key); } catch (exception ignore) {} } } } }
在run函数中:
- 若感兴趣集是自定义的op_register,则说明该事件表示的已连接套接字通道尚未被轮询线程处理过,那么将该通道注册到poller线程的selector上,感兴趣集是op_read,通道注册的附件是一个niosocketwrapper对象。从poller的register方法添加事件即是这样的过程;
- 否则获得已连接套接字通道注册到poller线程的selector上的selectionkey,为key添加新的感兴趣集。
重访poller
上文提到poller类实现了runnable接口,其重写的run方法如下所示。
public boolean events() { boolean result = false; pollerevent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { //直接调用run方法 pe.run(); pe.reset(); if (running && !paused) { eventcache.push(pe); } } catch ( throwable x ) { log.error("",x); } } return result; } @override public void run() { // loop until destroy() is called while (true) { boolean hasevents = false; try { if (!close) { /执行pollerevent的run方法 hasevents = events(); if (wakeupcounter.getandset(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keycount = selector.selectnow(); } else { keycount = selector.select(selectortimeout); } wakeupcounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (ioexception ioe) { log.error(sm.getstring("endpoint.nio.selectorclosefail"), ioe); } break; } } catch (throwable x) { exceptionutils.handlethrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keycount == 0 ) hasevents = (hasevents | events()); // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)” iterator<selectionkey> iterator = keycount > 0 ? selector.selectedkeys().iterator() : null; // walk through the collection of ready keys and dispatch // any active event. // 对已经准备好的key进行处理 while (iterator != null && iterator.hasnext()) { selectionkey sk = iterator.next(); niosocketwrapper attachment = (niosocketwrapper)sk.attachment(); // attachment may be null if another thread has called // cancelledkey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 真正处理key的地方 processkey(sk, attachment); } }//while //process timeouts timeout(keycount,hasevents); }//while getstoplatch().countdown(); }
- 若队列里有元素则会先把队列里的事件均执行一遍,pollerevent的run方法会将通道注册到poller的selector上;
- 对select返回的selectionkey进行处理,由于在pollerevent中注册通道时带上了niosocketwrapper附件,因此这里可以用selectionkey的attachment方法得到,接着调用processkey去处理已连接套接字通道。
我们接着分析processkey()
,该方法又会根据key的类型,来分别处理读和写。
- 处理读事件,比如生成request对象
- 处理写事件,比如将生成的response对象通过socket写回客户端
protected void processkey(selectionkey sk, niosocketwrapper attachment) { try { if ( close ) { cancelledkey(sk); } else if ( sk.isvalid() && attachment != null ) { if (sk.isreadable() || sk.iswritable() ) { if ( attachment.getsendfiledata() != null ) { processsendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyops()); boolean closesocket = false; // 1. 处理读事件,比如生成request对象 // read goes before write if (sk.isreadable()) { if (!processsocket(attachment, socketevent.open_read, true)) { closesocket = true; } } // 2. 处理写事件,比如将生成的response对象通过socket写回客户端 if (!closesocket && sk.iswritable()) { if (!processsocket(attachment, socketevent.open_write, true)) { closesocket = true; } } if (closesocket) { cancelledkey(sk); } } } } else { //invalid key cancelledkey(sk); } } catch ( cancelledkeyexception ckx ) { cancelledkey(sk); } catch (throwable t) { exceptionutils.handlethrowable(t); log.error("",t); } }
我们继续来分析方法processsocket()
。
- 从
processorcache
里面拿一个processor
来处理socket,processor
的实现为socketprocessor
- 将
processor
放到工作线程池中执行
public boolean processsocket(socketwrapperbase<s> socketwrapper, socketevent event, boolean dispatch) { try { if (socketwrapper == null) { return false; } // 1. 从`processorcache`里面拿一个`processor`来处理socket,`processor`的实现为`socketprocessor` socketprocessorbase<s> sc = processorcache.pop(); if (sc == null) { sc = createsocketprocessor(socketwrapper, event); } else { sc.reset(socketwrapper, event); } // 2. 将`processor`放到工作线程池中执行 executor executor = getexecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (rejectedexecutionexception ree) { getlog().warn(sm.getstring("endpoint.executor.fail", socketwrapper) , ree); return false; } catch (throwable t) { exceptionutils.handlethrowable(t); // this means we got an oom or similar creating a thread, or that // the pool and its queue are full getlog().error(sm.getstring("endpoint.process.fail"), t); return false; } return true; }
dispatch参数表示是否要在另外的线程中处理,上文processkey各处传递的参数都是true。
- dispatch为true且工作线程池存在时会执行executor.execute(sc),之后是由工作线程池处理已连接套接字;
- 否则继续由poller线程自己处理已连接套接字。
abstractendpoint类的createsocketprocessor是抽象方法,nioendpoint类实现了它:
@override protected socketprocessorbase<niochannel> createsocketprocessor( socketwrapperbase<niochannel> socketwrapper, socketevent event) { return new socketprocessor(socketwrapper, event); }
接着我们分析socketprocessor.dorun()
方法(socketprocessor.run()
方法最终调用此方法)。该方法将处理逻辑交给handler
处理,当event为null时,则表明是一个open_read
事件。
该类的注释说明socketprocessor与worker的作用等价。
/** * this class is the equivalent of the worker, but will simply use in an * external executor thread pool. */ protected class socketprocessor extends socketprocessorbase<niochannel> { public socketprocessor(socketwrapperbase<niochannel> socketwrapper, socketevent event) { super(socketwrapper, event); } @override protected void dorun() { niochannel socket = socketwrapper.getsocket(); selectionkey key = socket.getiochannel().keyfor(socket.getpoller().getselector()); try { int handshake = -1; try { if (key != null) { if (socket.ishandshakecomplete()) { // no tls handshaking required. let the handler // process this socket / event combination. handshake = 0; } else if (event == socketevent.stop || event == socketevent.disconnect || event == socketevent.error) { // unable to complete the tls handshake. treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isreadable(), key.iswritable()); // the handshake process reads/writes from/to the // socket. status may therefore be open_write once // the handshake completes. however, the handshake // happens when the socket is opened so the status // must always be open_read after it completes. it // is ok to always set this as it is only used if // the handshake completes. event = socketevent.open_read; } } } catch (ioexception x) { handshake = -1; if (log.isdebugenabled()) log.debug("error during ssl handshake",x); } catch (cancelledkeyexception ckx) { handshake = -1; } if (handshake == 0) { socketstate state = socketstate.open; // process the request from this socket // 将处理逻辑交给`handler`处理,当event为null时,则表明是一个`open_read`事件 if (event == null) { state = gethandler().process(socketwrapper, socketevent.open_read); } else { state = gethandler().process(socketwrapper, event); } if (state == socketstate.closed) { close(socket, key); } } else if (handshake == -1 ) { close(socket, key); } else if (handshake == selectionkey.op_read){ socketwrapper.registerreadinterest(); } else if (handshake == selectionkey.op_write){ socketwrapper.registerwriteinterest(); } } catch (cancelledkeyexception cx) { socket.getpoller().cancelledkey(key); } catch (virtualmachineerror vme) { exceptionutils.handlethrowable(vme); } catch (throwable t) { log.error("", t); socket.getpoller().cancelledkey(key); } finally { socketwrapper = null; event = null; //return to cache if (running && !paused) { processorcache.push(this); } } } }
handler
的关键方法是process(),虽然这个方法有很多条件分支,但是逻辑却非常清楚,主要是调用
processor.process()
方法。
@override public socketstate process(socketwrapperbase<s> wrapper, socketevent status) { try { if (processor == null) { processor = getprotocol().createprocessor(); register(processor); } processor.setsslsupport( wrapper.getsslsupport(getprotocol().getclientcertprovider())); // associate the processor with the connection connections.put(socket, processor); socketstate state = socketstate.closed; do { // 关键的代码,终于找到你了 state = processor.process(wrapper, status); } while ( state == socketstate.upgrading); return state; } catch (throwable e) { exceptionutils.handlethrowable(e); // any other exception or error is odd. here we log it // with "error" level, so it will show up even on // less-than-verbose logs. getlog().error(sm.getstring("abstractconnectionhandler.error"), e); } finally { containerthreadmarker.clear(); } // make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return socketstate.closed; }
processor
createprocessor
protected http11processor createprocessor() { // 构建 http11processor http11processor processor = new http11processor( proto.getmaxhttpheadersize(), (jioendpoint)proto.endpoint, // 1. http header 的最大尺寸 proto.getmaxtrailersize(),proto.getmaxextensionsize()); processor.setadapter(proto.getadapter()); // 2. 默认的 keepalive 情况下, 每个 socket 处理的最多的 请求次数 processor.setmaxkeepaliverequests(proto.getmaxkeepaliverequests()); // 3. 开启 keepalive 的 timeout processor.setkeepalivetimeout(proto.getkeepalivetimeout()); // 4. http 当遇到文件上传时的 默认超时时间 (300 * 1000) processor.setconnectionuploadtimeout( proto.getconnectionuploadtimeout()); processor.setdisableuploadtimeout(proto.getdisableuploadtimeout()); // 5. 当 http 请求的 body size超过这个值时, 通过 gzip 进行压缩 processor.setcompressionminsize(proto.getcompressionminsize()); // 6. http 请求是否开启 compression 处理 processor.setcompression(proto.getcompression()); processor.setnocompressionuseragents(proto.getnocompressionuseragents()); // 7. http body里面的内容是 "text/html,text/xml,text/plain" 才会进行 压缩处理 processor.setcompressablemimetypes(proto.getcompressablemimetypes()); processor.setrestricteduseragents(proto.getrestricteduseragents()); // 8. socket 的 buffer, 默认 9000 processor.setsocketbuffer(proto.getsocketbuffer()); // 9. 最大的 post 处理尺寸的大小 4 * 1000 processor.setmaxsavepostsize(proto.getmaxsavepostsize()); processor.setserver(proto.getserver()); processor.setdisablekeepalivepercentage( proto.getdisablekeepalivepercentage()); register(processor); return processor; }
这儿我们主要关注的是processor
对于读的操作,也只有一行代码。调用service()
方法。
public abstract class abstractprocessorlight implements processor { @override public socketstate process(socketwrapperbase<?> socketwrapper, socketevent status) throws ioexception { socketstate state = socketstate.closed; iterator<dispatchtype> dispatches = null; do { if (dispatches != null) { dispatchtype nextdispatch = dispatches.next(); state = dispatch(nextdispatch.getsocketstatus()); } else if (status == socketevent.disconnect) { // do nothing here, just wait for it to get recycled } else if (isasync() || isupgrade() || state == socketstate.async_end) { state = dispatch(status); if (state == socketstate.open) { // there may be pipe-lined data to read. if the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. to avoid this, // process it now. state = service(socketwrapper); } } else if (status == socketevent.open_write) { // extra write event likely after async, ignore state = socketstate.long; } else if (status == socketevent.open_read){ // 调用`service()`方法 state = service(socketwrapper); } else { // default to closing the socket if the socketevent passed in // is not consistent with the current state of the processor state = socketstate.closed; } if (getlog().isdebugenabled()) { getlog().debug("socket: [" + socketwrapper + "], status in: [" + status + "], state out: [" + state + "]"); } if (state != socketstate.closed && isasync()) { state = asyncpostprocess(); if (getlog().isdebugenabled()) { getlog().debug("socket: [" + socketwrapper + "], state after async post processing: [" + state + "]"); } } if (dispatches == null || !dispatches.hasnext()) { // only returns non-null iterator if there are // dispatches to process. dispatches = getiteratorandcleardispatches(); } } while (state == socketstate.async_end || dispatches != null && state != socketstate.closed); return state; } }
processor.service()
方法比较重要的地方就两点。该方法非常得长,也超过了200行,在此我们不再拷贝此方法的代码。
- 生成request和response对象
- 调用
adapter.service()
方法,将生成的request和response对象传进去
adapter
adapter
用于连接connector
和container
,起到承上启下的作用。processor
会调用adapter.service()
方法。我们来分析一下,主要做了下面几件事情:
- 根据coyote框架的request和response对象,生成connector的request和response对象(是httpservletrequest和httpservletresponse的封装)
- 补充header
- 解析请求,该方法会出现代理服务器、设置必要的header等操作
- 真正进入容器的地方,调用engine容器下pipeline的阀门
- 通过request.finishrequest 与 response.finishresponse(刷outputbuffer中的数据到浏览器) 来完成整个请求
@override public void service(org.apache.coyote.request req, org.apache.coyote.response res) throws exception { // 1. 根据coyote框架的request和response对象,生成connector的request和response对象(是httpservletrequest和httpservletresponse的封装) request request = (request) req.getnote(adapter_notes); response response = (response) res.getnote(adapter_notes); if (request == null) { // create objects request = connector.createrequest(); request.setcoyoterequest(req); response = connector.createresponse(); response.setcoyoteresponse(res); // link objects request.setresponse(response); response.setrequest(request); // set as notes req.setnote(adapter_notes, request); res.setnote(adapter_notes, response); // set query string encoding req.getparameters().setquerystringcharset(connector.geturicharset()); } // 2. 补充header if (connector.getxpoweredby()) { response.addheader("x-powered-by", powered_by); } boolean async = false; boolean postparsesuccess = false; req.getrequestprocessor().setworkerthreadname(thread_name.get()); try { // parse and set catalina and configuration specific // request parameters // 3. 解析请求,该方法会出现代理服务器、设置必要的header等操作 // 用来处理请求映射 (获取 host, context, wrapper, uri 后面的参数的解析, sessionid ) postparsesuccess = postparserequest(req, request, res, response); if (postparsesuccess) { //check valves if we support async request.setasyncsupported( connector.getservice().getcontainer().getpipeline().isasyncsupported()); // calling the container // 4. 真正进入容器的地方,调用engine容器下pipeline的阀门 connector.getservice().getcontainer().getpipeline().getfirst().invoke( request, response); } if (request.isasync()) { async = true; readlistener readlistener = req.getreadlistener(); if (readlistener != null && request.isfinished()) { // possible the all data may have been read during service() // method so this needs to be checked here classloader oldcl = null; try { oldcl = request.getcontext().bind(false, null); if (req.sendalldatareadevent()) { req.getreadlistener().onalldataread(); } } finally { request.getcontext().unbind(false, oldcl); } } throwable throwable = (throwable) request.getattribute(requestdispatcher.error_exception); // if an async request was started, is not going to end once // this container thread finishes and an error occurred, trigger // the async error process if (!request.isasynccompleting() && throwable != null) { request.getasynccontextinternal().seterrorstate(throwable, true); } } else { //5. 通过request.finishrequest 与 response.finishresponse(刷outputbuffer中的数据到浏览器) 来完成整个请求 request.finishrequest(); //将 org.apache.catalina.connector.response对应的 outputbuffer 中的数据 刷到 org.apache.coyote.response 对应的 internaloutputbuffer 中, 并且最终调用 socket对应的 outputstream 将数据刷出去( 这里会组装 http response 中的 header 与 body 里面的数据, 并且刷到远端 ) response.finishresponse(); } } catch (ioexception e) { // ignore } finally { atomicboolean error = new atomicboolean(false); res.action(actioncode.is_error, error); if (request.isasynccompleting() && error.get()) { // connection will be forcibly closed which will prevent // completion happening at the usual point. need to trigger // call to oncomplete() here. res.action(actioncode.async_post_process, null); async = false; } // access log if (!async && postparsesuccess) { // log only if processing was invoked. // if postparserequest() failed, it has already logged it. context context = request.getcontext(); // if the context is null, it is likely that the endpoint was // shutdown, this connection closed and the request recycled in // a different thread. that thread will have updated the access // log so it is ok not to update the access log here in that // case. if (context != null) { context.logaccess(request, response, system.currenttimemillis() - req.getstarttime(), false); } } req.getrequestprocessor().setworkerthreadname(null); // recycle the wrapper request and response if (!async) { request.recycle(); response.recycle(); } } }
请求预处理
postparserequest方法对请求做预处理,如对路径去除分号表示的路径参数、进行uri解码、规格化(点号和两点号)
protected boolean postparserequest(org.apache.coyote.request req, request request, org.apache.coyote.response res, response response) throws ioexception, servletexception { // 省略部分代码 messagebytes decodeduri = req.decodeduri(); if (undecodeduri.gettype() == messagebytes.t_bytes) { // copy the raw uri to the decodeduri decodeduri.duplicate(undecodeduri); // parse the path parameters. this will: // - strip out the path parameters // - convert the decodeduri to bytes parsepathparameters(req, request); // uri decoding // %xx decoding of the url try { req.geturldecoder().convert(decodeduri, false); } catch (ioexception ioe) { res.setstatus(400); res.setmessage("invalid uri: " + ioe.getmessage()); connector.getservice().getcontainer().logaccess( request, response, 0, true); return false; } // normalization if (!normalize(req.decodeduri())) { res.setstatus(400); res.setmessage("invalid uri"); connector.getservice().getcontainer().logaccess( request, response, 0, true); return false; } // character decoding converturi(decodeduri, request); // check that the uri is still normalized if (!checknormalize(req.decodeduri())) { res.setstatus(400); res.setmessage("invalid uri character encoding"); connector.getservice().getcontainer().logaccess( request, response, 0, true); return false; } } else { /* the uri is chars or string, and has been sent using an in-memory * protocol handler. the following assumptions are made: * - req.requesturi() has been set to the 'original' non-decoded, * non-normalized uri * - req.decodeduri() has been set to the decoded, normalized form * of req.requesturi() */ decodeduri.tochars(); // remove all path parameters; any needed path parameter should be set // using the request object rather than passing it in the url charchunk uricc = decodeduri.getcharchunk(); int semicolon = uricc.indexof(';'); if (semicolon > 0) { decodeduri.setchars (uricc.getbuffer(), uricc.getstart(), semicolon); } } // request mapping. messagebytes servername; if (connector.getuseipvhosts()) { servername = req.localname(); if (servername.isnull()) { // well, they did ask for it res.action(actioncode.req_local_name_attribute, null); } } else { servername = req.servername(); } // version for the second mapping loop and // context that we expect to get for that version string version = null; context versioncontext = null; boolean maprequired = true; while (maprequired) { // this will map the the latest version by default connector.getservice().getmapper().map(servername, decodeduri, version, request.getmappingdata()); // 省略部分代码 } // 省略部分代码 }
以messagebytes的类型是t_bytes为例:
- parsepathparameters方法去除uri中分号表示的路径参数;
- req.geturldecoder()得到一个udecoder实例,它的convert方法对uri解码,这里的解码只是移除百分号,计算百分号后两位的十六进制数字值以替代原来的三位百分号编码;
- normalize方法规格化uri,解释路径中的“.”和“..”;
- converturi方法利用connector的uriencoding属性将uri的字节转换为字符表示;
- 注意connector.getservice().getmapper().map(servername, decodeduri, version, request.getmappingdata()) 这行,之前service启动时mapperlistener注册了该service内的各host和context。根据uri选择context时,mapper的map方法采用的是converturi方法解码后的uri与每个context的路径去比较
容器处理
如果请求可以被传给容器的pipeline即当postparserequest方法返回true时,则由容器继续处理,在service方法中有connector.getservice().getcontainer().getpipeline().getfirst().invoke(request, response)这一行:
- connector调用getservice返回standardservice;
- standardservice调用getcontainer返回standardengine;
- standardengine调用getpipeline返回与其关联的standardpipeline;
后续处理流程请看下一篇文章
上一篇: 深入理解设计模式(20):桥接模式
推荐阅读
-
Tomcat源码分析 (六)----- Tomcat 启动过程(一)
-
一次完整的http请求处理过程
-
Tomcat源码分析 (八)----- HTTP请求处理过程(一)
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)
-
Tomcat源码分析三:Tomcat启动加载过程(一)的源码解析
-
Tomcat学习(一) - Tomcat结构以及处理一个请求的过程
-
Tomcat学习(一) - Tomcat结构以及处理一个请求的过程
-
Tomcat源码分析 (六)----- Tomcat 启动过程(一)
-
Zend framework处理一个http请求的流程分析_PHP教程
-
Zend framework处理一个http请求的流程分析_PHP教程