Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程
程序员文章站
2022-05-06 14:37:44
...
Tomcat的Server初始化及启动过程:http://donald-draper.iteye.com/blog/2327060
Tomcat的connector:http://hill007299.iteye.com/blog/1757198
阻塞队列--LinkedBlockingQueue:http://www.cnblogs.com/linjiqin/p/5128048.html
Socket处理(2) - AprEndpoint:http://blog.sina.com.cn/s/blog_53b51a980100qejl.html
在Server的启动过程中,我们看到了Connectors的初始化与启动,那时我们没有深入分析
,今天我们来看一下:
//Connector
//Http11Protocol
查看Http11Protocol协议 ,Http11Protocol无init方法,再查看
//SSLImplementation
分析AbstractHttp11JsseProtocol初始化,首先获取SSLImplementation实例
,调用父类的init
查看AbstractHttp11Protocol,无初始化方法
查看AbstractProtocol
AbstractProtocol初始化最重要的一点是网络IO Handler初始化,下面
我们来看AbstractEndpoint的初始化:
而AbstractEndpoint有JIoEndpoint,NioEndpoint,AprEndpoint子类实现
在serverx.xml中有这么一段:
JIoEndpoint,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
NioEndpoint,使用java nio实现,使用反应器模式,线程和连接解绑,多路复用。
AprEndpoint,使用Apache Portable Runtime实现,直接调用native方法,有更高的效率,但是实现依赖具体平台。
调试追踪Tomcat7.0.70发现如下图:
这里我们来看一下AprEndpoint
查看AbstractEndpoint
//创建线程执行器
//查看TaskQueue
//查看TaskThreadFactory
查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}
查看Poller线程的初始化和启动,
Poller类主要负责poll传入的socket连接(提供add(),由其他线程传入),如果发现有事件,交给AjpConnectionHandler处理。
// Start poller thread
Poller是AprEndPointer的内部类
来看SocketProcessor
根据handler.process(socket, status),我们去查看handler是如何处理请求的在Http11Protocol的
构造方法中可以看到handler就是Http11ConnectionHandler
查看Http11ConnectionHandler是Http11Protocol的内部类
而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
查看Http11AprProcessor
无处理函数process(socket, status)
查看AbstractHttp11Processor
从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
从上面可以看出Poller,调用本地方创建socket监听线程,当监听到连接时,创一个SocketProcessor,即socket处理线程SOCKET处理线程,委托给Http11ConnectionHandler(Http11ConnectionHandler是Http11Protocol的内部类)处理,
Http11ConnectionHandler继承AbstractConnectionHandler,AbstractConnectionHandler处理请求是获取precessor,precessor从Http11ConnectionHandler获取,实际为Http11AprProcessor,Http11AprProcessor继承AbstractHttp11Processor,并处理请求在process(SocketWrapper<S> socketWrapper)中,在这个方法中处理请求头部,再讲实际业务逻辑处理委托CoyoteAdapter的service方法,service方法通过反射调用对应的servlet的service方法来处理逻辑。
在这里,简单总结一下,每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),
而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。
上面一部分的堆栈有点深,现在退出来,回到AprEndpoint的startInternal看
startAcceptorThreads();这句话
这个方法在AbstractEndPoint中
//AbstractEndPoint的抽象静态内部类Acceptor,描述Socket请求处理状态
查看AprEndpoint的createAcceptor方法
总结Connector
每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理Request的Servlet包装类;同时每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。Connector初始化最主要的是,初始化Http11Protocol,实际初始化Endpoint,EndPoint初始化bind,SOCKET地址,并监听请求及初始化SSL,Endpoint启动,即初始化Poller(处理socket请求)。
下面一篇文章我们来看看,JioEndPoint。
//ContainerThreadMarker
//Pool
//SocketWrapper
Tomcat的connector:http://hill007299.iteye.com/blog/1757198
阻塞队列--LinkedBlockingQueue:http://www.cnblogs.com/linjiqin/p/5128048.html
Socket处理(2) - AprEndpoint:http://blog.sina.com.cn/s/blog_53b51a980100qejl.html
在Server的启动过程中,我们看到了Connectors的初始化与启动,那时我们没有深入分析
,今天我们来看一下:
public class StandardService extends LifecycleMBeanBase implements Service { private static final String info = "org.apache.catalina.core.StandardService/1.0"; private String name = null; private static final StringManager sm = StringManager.getManager(Constants.Package); private Server server = null; protected Connector connectors[] = new Connector[0]; private final Object connectorsLock = new Object(); protected ArrayList<Executor> executors = new ArrayList<Executor>(); protected Container container = null; private ClassLoader parentClassLoader = null; protected void initInternal() throws LifecycleException { synchronized (connectorsLock) { for (Connector connector : connectors) { try { //初始化Connector connector.init(); } } } } protected void startInternal() throws LifecycleException { synchronized (connectorsLock) { for (Connector connector: connectors) { try { if (connector.getState() != LifecycleState.FAILED) { //启动connector connector.start(); } } } } } }
//Connector
public class Connector extends LifecycleMBeanBase { //关联Service protected Service service = null; /** The port number on which we listen for requests.*/ protected int port = -1; protected String proxyName = null; protected int proxyPort = 0; /** The redirect port for non-SSL to SSL redirects. */ protected int redirectPort = 443; protected String scheme = "http"; //Get or Post 允许最大请求参数个数 protected int maxParameterCount = 10000; //参数解析最大size,默认2MB protected int maxPostSize = 2 * 1024 * 1024; protected int maxSavePostSize = 4 * 1024; //需要解析Body的方法 protected String parseBodyMethods = "POST"; //协议处理handler Class protected String protocolHandlerClassName ="org.apache.coyote.http11.Http11Protocol"; /**Coyote protocol handler.*/ protected ProtocolHandler protocolHandler = null; /** Coyote adapter.*/ protected Adapter adapter = null; //HTTP,Servlet API protected Mapper mapper = new Mapper(); //MapperListener protected MapperListener mapperListener = new MapperListener(mapper, this); protected String URIEncoding = null; protected boolean useBodyEncodingForURI = false; //构造函数 public Connector(String protocol) { setProtocol(protocol); // Instantiate protocol handler try { Class<?> clazz = Class.forName(protocolHandlerClassName); this.protocolHandler = (ProtocolHandler) clazz.newInstance(); } } //初始化 protected void initInternal() throws LifecycleException { super.initInternal(); //新建请求处理器代理 adapter = new CoyoteAdapter(this); //设置协议处理器的Adapter protocolHandler.setAdapter(adapter); // Make sure parseBodyMethodsSet has a default if( null == parseBodyMethodsSet ) { setParseBodyMethods(getParseBodyMethods()); } if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerNoApr", getProtocolHandlerClassName())); } try { //初始化协议处理器 protocolHandler.init(); } // Initialize mapper listener //初始化,注册到JMX,实际调用的是,LifecycleMBeanBase.initInternal mapperListener.init(); } }
//Http11Protocol
/** * Abstract the protocol implementation, including threading, etc. * Processor is single threaded and specific to stream-based protocols, * will not fit Jk protocols like JNI. */ public class Http11Protocol extends AbstractHttp11JsseProtocol<Socket> { protected Http11ConnectionHandler cHandler; private int disableKeepAlivePercentage = 75; public Http11Protocol() { //新建请求处理EndPoint为JIoEndpoint endpoint = new JIoEndpoint(); cHandler = new Http11ConnectionHandler(this); //设置endpoint的handler ((JIoEndpoint) endpoint).setHandler(cHandler); setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); } }
查看Http11Protocol协议 ,Http11Protocol无init方法,再查看
AbstractHttp11JsseProtocol public abstract class AbstractHttp11JsseProtocol<S> extends AbstractHttp11Protocol<S> { protected SSLImplementation sslImplementation = null; // ------------------------------------------------------- Lifecycle methods @Override public void init() throws Exception { // SSL implementation needs to be in place before end point is // initialized //初始化SSLImplementation sslImplementation = SSLImplementation.getInstance(sslImplementationName); super.init(); } }
//SSLImplementation
public abstract class SSLImplementation { private static final String[] implementations = { JSSEImplementationClass }; //获取SSL实现类 public static SSLImplementation getInstance(String className) throws ClassNotFoundException { if (className == null) return getInstance(); try { // Workaround for the J2SE 1.4.x classloading problem (under // Solaris). // Class.forName(..) fails without creating class using new. // This is an ugly workaround. if (JSSEImplementationClass.equals(className)) { return new org.apache.tomcat.util.net.jsse.JSSEImplementation(); } Class<?> clazz = Class.forName(className); return (SSLImplementation) clazz.newInstance(); } } public static SSLImplementation getInstance() throws ClassNotFoundException { for (int i = 0; i < implementations.length; i++) { try { SSLImplementation impl = getInstance(implementations[i]); return impl; } } } }
分析AbstractHttp11JsseProtocol初始化,首先获取SSLImplementation实例
,调用父类的init
查看AbstractHttp11Protocol,无初始化方法
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> { @Override protected String getProtocolName() { return "Http"; } }
查看AbstractProtocol
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration { //Name of MBean for the Global Request Processor. protected ObjectName rgOname = null; /** Name of MBean for the ThreadPool.*/ protected ObjectName tpOname = null; /** * Unique ID for this connector. Only used if the connector is configured * to use a random port as the port will change if stop(), start() is * called. */ private int nameIndex = 0; /** * Endpoint that provides low-level network I/O - must be matched to the * ProtocolHandler implementation (ProtocolHandler using BIO, requires BIO * Endpoint etc.). */ protected AbstractEndpoint<S> endpoint = null; /** * The adapter provides the link between the ProtocolHandler and the * connector. */ protected Adapter adapter; // ------------------------------------------------------- Lifecycle methods /* * NOTE: There is no maintenance of state or checking for valid transitions * within this class. It is expected that the connector will maintain state * and prevent invalid state transitions. */ @Override public void init() throws Exception { //初始化Global Request Processor Mbean,并注册到JMX if (oname == null) { // Component not pre-registered so register it oname = createObjectName(); if (oname != null) { Registry.getRegistry(null, null).registerComponent(this, oname, null); } } //初始化MBean for the ThreadPool,并注册到JMX if (this.domain != null) { try { tpOname = new ObjectName(domain + ":" + "type=ThreadPool,name=" + getName()); Registry.getRegistry(null, null).registerComponent(endpoint, tpOname, null); } } rgOname=new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName()); Registry.getRegistry(null, null).registerComponent( getHandler().getGlobal(), rgOname, null ); } String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); try { //网络IO Handler初始化 endpoint.init(); } } //启动 @Override public void start() throws Exception { try { //启动endPoint endpoint.start(); } } }
AbstractProtocol初始化最重要的一点是网络IO Handler初始化,下面
我们来看AbstractEndpoint的初始化:
public abstract class AbstractEndpoint<S> { public static interface Handler { /** * Different types of socket states to react upon. */ public enum SocketState { // TODO Add a new state to the AsyncStateMachine and remove // ASYNC_END (if possible) OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING_TOMCAT, UPGRADING, UPGRADED } } //绑定状态枚举 protected enum BindState { UNBOUND, BOUND_ON_INIT, BOUND_ON_START } //http请求接受器Acceptor public abstract static class Acceptor implements Runnable { public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED } protected volatile AcceptorState state = AcceptorState.NEW; public final AcceptorState getState() { return state; } } /** * Threads used to accept new connections and pass them to worker threads. */ protected Acceptor[] acceptors; /** * External Executor based thread pool. */ private Executor executor = null; /** * Server socket port. */ private int port; /** * Address for the server socket. */ private InetAddress address; /** * Controls when the Endpoint binds the port. <code>true</code>, the default * binds the port on {@link #init()} and unbinds it on {@link #destroy()}. * If set to <code>false</code> the port is bound on {@link #start()} and * unbound on {@link #stop()}. */ private boolean bindOnInit = true; private BindState bindState = BindState.UNBOUND; /** * Keepalive timeout, if not set the soTimeout is used. */ private Integer keepAliveTimeout = null; /** * SSL engine. */ private boolean SSLEnabled = false; /** * Socket timeout. */ public int getSoTimeout() { return socketProperties.getSoTimeout(); } /** * The default is true - the created threads will be * in daemon mode. If set to false, the control thread * will not be daemon - and will keep the process alive. */ private boolean daemon = true; /** * Priority of the worker threads. */ protected int threadPriority = Thread.NORM_PRIORITY; /* * NOTE: There is no maintenance of state or checking for valid transitions * within this class other than ensuring that bind/unbind are called in the * right place. It is expected that the calling code will maintain state and * prevent invalid state transitions. */ //看到这几个函数,是否有种似曾相识的感觉 public abstract void bind() throws Exception; public abstract void unbind() throws Exception; public abstract void startInternal() throws Exception; public abstract void stopInternal() throws Exception; //初始化 public final void init() throws Exception { testServerCipherSuitesOrderSupport(); //bindOnInit默认为true, if (bindOnInit) { //而bind为抽象函数,待子类扩展 bind(); bindState = BindState.BOUND_ON_INIT; } } //启动 public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } //而startInternal为抽象函数,待子类扩展 startInternal(); } }
而AbstractEndpoint有JIoEndpoint,NioEndpoint,AprEndpoint子类实现
在serverx.xml中有这么一段:
<!-- A "Connector" represents an endpoint by which requests are received and responses are returned. Documentation at : Java HTTP Connector: /docs/config/http.html (blocking & non-blocking) Java AJP Connector: /docs/config/ajp.html APR (HTTP/AJP) Connector: /docs/apr.html Define a non-SSL HTTP/1.1 Connector on port 8080 --> <Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
JIoEndpoint,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
NioEndpoint,使用java nio实现,使用反应器模式,线程和连接解绑,多路复用。
AprEndpoint,使用Apache Portable Runtime实现,直接调用native方法,有更高的效率,但是实现依赖具体平台。
调试追踪Tomcat7.0.70发现如下图:
这里我们来看一下AprEndpoint
/** * APR tailored thread pool, providing the following services: * [list] * [*]Socket acceptor thread * [*]Socket poller thread * [*]Sendfile thread * [*]Worker threads pool * [/list] */ public class AprEndpoint extends AbstractEndpoint<Long> { protected static final Set<String> SSL_PROTO_ALL = new HashSet<String>(); static { /* Default used if SSLProtocol is not configured, also used if SSLProtocol="All" */ SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1); SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1_1); SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1_2); } /** * Root APR memory pool. */ protected long rootPool = 0; /** * Server socket "pointer". */ protected long serverSock = 0; /** * APR memory pool for the server socket. */ protected long serverSockPool = 0; /** * SSL context. */ protected long sslContext = 0; private final Map<Long,AprSocketWrapper> connections = new ConcurrentHashMap<Long, AprSocketWrapper>(); // ------------------------------------------------------------ Constructor public AprEndpoint() { // Need to override the default for maxConnections to align it with what // was pollerSize (before the two were merged) setMaxConnections(8 * 1024); } private static class AprSocketWrapper extends SocketWrapper<Long> { // This field should only be used by Poller#run() private int pollerFlags = 0; public AprSocketWrapper(Long socket) { super(socket); } } /** * Handling of accepted sockets. */ protected Handler handler = null; /** * Poll interval, in microseconds. The smaller the value, the more CPU the poller * will use, but the more responsive to activity it will be. */ protected int pollTime = 2000; /** * Allow comet request handling. */ protected boolean useComet = true; /** * The socket poller. */ protected Poller poller = null; /** * SSL protocols. */ protected String SSLProtocol = "all"; /** * SSL password (if a cert is encrypted, and no password has been provided, a callback * will ask for a password). */ protected String SSLPassword = null; /** * SSL certificate file. */ protected String SSLCertificateFile = null; /** * SSL certificate key file. */ protected String SSLCertificateKeyFile = null; /** * SSL CA certificate path. */ protected String SSLCACertificatePath = null; /** * SSL CA certificate file. */ protected String SSLCACertificateFile = null; //初始化EndPoint,所有关键工作都在这里 @Override public void bind() throws Exception { // Create the root APR memory pool try { //调用本地方法,创建apr内存池 rootPool = Pool.create(0); } // Create the pool for the server socket serverSockPool = Pool.create(rootPool); // Create the APR address that will be bound String addressStr = null; if (getAddress() != null) { addressStr = getAddress().getHostAddress(); } int family = Socket.APR_INET; if (Library.APR_HAVE_IPV6) { if (addressStr == null) { if (!OS.IS_BSD && !OS.IS_WIN32 && !OS.IS_WIN64) family = Socket.APR_UNSPEC; } else if (addressStr.indexOf(':') >= 0) { family = Socket.APR_UNSPEC; } } long inetAddress = Address.info(addressStr, family, getPort(), 0, rootPool); // Create the APR server socket serverSock = Socket.create(Address.getInfo(inetAddress).family, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, rootPool); if (OS.IS_UNIX) { Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); } // Deal with the firewalls that tend to drop the inactive sockets Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1); //server socket地址绑定 int ret = Socket.bind(serverSock, inetAddress); // Start listening on the server socket ret = Socket.listen(serverSock, getBacklog()); if (OS.IS_WIN32 || OS.IS_WIN64) { // On Windows set the reuseaddr flag after the bind/listen Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); } // Sendfile usage on systems which don't support it cause major problems if (useSendfile && !Library.APR_HAS_SENDFILE) { useSendfile = false; } // Initialize thread count default for acceptor if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } // Delay accepting of new connections until data is available // Only Linux kernels 2.4 + have that implemented // on other platforms this call is noop and will return APR_ENOTIMPL. if (deferAccept) { if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) { deferAccept = false; } } //如果SSL开启,这SSL上下文 if (isSSLEnabled()) { if (SSLCertificateFile == null) { // This is required throw new Exception(sm.getString("endpoint.apr.noSslCertFile")); } // SSL protocol int value = SSL.SSL_PROTOCOL_NONE; if (SSLProtocol == null || SSLProtocol.length() == 0) { value = SSL.SSL_PROTOCOL_ALL; } else { Set<String> protocols = new HashSet<String>(); // List of protocol names, separated by "+" or "-". // Semantics is adding ("+") or removing ("-") from left // to right, starting with an empty protocol set. // Tokens are individual protocol names or "all" for a // default set of supported protocols. // Split using a positive lookahead to keep the separator in // the capture so we can check which case it is. for (String protocol : SSLProtocol.split("(?=[-+])")) { String trimmed = protocol.trim(); // Ignore token which only consists of prefix character if (trimmed.length() > 1) { if (trimmed.charAt(0) == '-') { trimmed = trimmed.substring(1).trim(); if (trimmed.equalsIgnoreCase(Constants.SSL_PROTO_ALL)) { protocols.removeAll(SSL_PROTO_ALL); } else { protocols.remove(trimmed); } } else { if (trimmed.charAt(0) == '+') { trimmed = trimmed.substring(1).trim(); } if (trimmed.equalsIgnoreCase(Constants.SSL_PROTO_ALL)) { protocols.addAll(SSL_PROTO_ALL); } else { protocols.add(trimmed); } } } } for (String protocol : protocols) { if (Constants.SSL_PROTO_SSLv2.equalsIgnoreCase(protocol)) { value |= SSL.SSL_PROTOCOL_SSLV2; } else if (Constants.SSL_PROTO_SSLv3.equalsIgnoreCase(protocol)) { value |= SSL.SSL_PROTOCOL_SSLV3; } else if (Constants.SSL_PROTO_TLSv1.equalsIgnoreCase(protocol)) { value |= SSL.SSL_PROTOCOL_TLSV1; } else if (Constants.SSL_PROTO_TLSv1_1.equalsIgnoreCase(protocol)) { value |= SSL.SSL_PROTOCOL_TLSV1_1; } else if (Constants.SSL_PROTO_TLSv1_2.equalsIgnoreCase(protocol)) { value |= SSL.SSL_PROTOCOL_TLSV1_2; } else { // Protocol not recognized, fail to start as it is safer than // continuing with the default which might enable more than the // is required throw new Exception(sm.getString( "endpoint.apr.invalidSslProtocol", SSLProtocol)); } } } // Create SSL Context try { sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); } if (SSLInsecureRenegotiation) { boolean legacyRenegSupported = false; try { legacyRenegSupported = SSL.hasOp(SSL.SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION); if (legacyRenegSupported) SSLContext.setOptions(sslContext, SSL.SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION); } } // Set cipher order: client (default) or server if (SSLHonorCipherOrder) { boolean orderCiphersSupported = false; try { orderCiphersSupported = SSL.hasOp(SSL.SSL_OP_CIPHER_SERVER_PREFERENCE); if (orderCiphersSupported) SSLContext.setOptions(sslContext, SSL.SSL_OP_CIPHER_SERVER_PREFERENCE); } } // Disable compression if requested if (SSLDisableCompression) { boolean disableCompressionSupported = false; try { disableCompressionSupported = SSL.hasOp(SSL.SSL_OP_NO_COMPRESSION); if (disableCompressionSupported) SSLContext.setOptions(sslContext, SSL.SSL_OP_NO_COMPRESSION); } } // List the ciphers that the client is permitted to negotiate SSLContext.setCipherSuite(sslContext, SSLCipherSuite); // Load Server key and certificate SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); // Set certificate chain file SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); // Support Client Certificates SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); // Set revocation SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); // Client certificate verification value = SSL.SSL_CVERIFY_NONE; if ("optional".equalsIgnoreCase(SSLVerifyClient)) { value = SSL.SSL_CVERIFY_OPTIONAL; } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { value = SSL.SSL_CVERIFY_REQUIRE; } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; } SSLContext.setVerify(sslContext, value, SSLVerifyDepth); // For now, sendfile is not supported with SSL useSendfile = false; } } /** * Start the APR endpoint, creating acceptor, poller and sendfile threads. */ @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; // Create worker collection if (getExecutor() == null) { //AbstractEndpoint创建线程执行器, createExecutor(); } //初始化连接共享锁 initializeConnectionLatch(); // Start poller thread //bind监听端口地址,并监听处理HTTP(TCP)请求,初始化SSL poller = new Poller(); poller.init(); Thread pollerThread = new Thread(poller, getName() + "-Poller"); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); // Start sendfile thread if (useSendfile) { sendfile = new Sendfile(); sendfile.init(); Thread sendfileThread = new Thread(sendfile, getName() + "-Sendfile"); sendfileThread.setPriority(threadPriority); sendfileThread.setDaemon(true); sendfileThread.start(); } startAcceptorThreads(); // Start async timeout thread asyncTimeout = new AsyncTimeout(); Thread timeoutThread = new Thread(asyncTimeout, getName() + "-AsyncTimeout"); timeoutThread.setPriority(threadPriority); timeoutThread.setDaemon(true); timeoutThread.start(); } }
查看AbstractEndpoint
//创建线程执行器
public void createExecutor() { internalExecutor = true; //新建任务队列 TaskQueue taskqueue = new TaskQueue(); //新建任务线程工程 TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); //初始化线程执行器 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }
//查看TaskQueue
/** * As task queue specifically designed to run with a thread pool executor. * The task queue is optimised to properly utilize threads within * a thread pool executor. If you use a normal queue, the executor will spawn threads * when there are idle threads and you wont be able to force items unto the queue itself * */ TaskQueue继承了LinkedBlockingQueue public class TaskQueue extends LinkedBlockingQueue<Runnable> { private ThreadPoolExecutor parent = null; // no need to be volatile, the one times when we change and read it occur in // a single thread (the one that did stop a context and fired listeners) private Integer forcedRemainingCapacity = null; @Override //如果可能的话,将Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。 public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } @Override //获取并移除此队列的头Runnable,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { Runnable runnable = super.poll(timeout, unit); if (runnable == null && parent != null) { // the poll timed out, it gives an opportunity to stop the current // thread if needed to avoid memory leaks. parent.stopCurrentThreadIfNeeded(); } return runnable; } @Override //获取BlockingQueue里排在首位的对象Runnable,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止 public Runnable take() throws InterruptedException { if (parent != null && parent.currentThreadShouldBeStopped()) { return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); // yes, this may return null (in case of timeout) which normally // does not occur with take() // but the ThreadPoolExecutor implementation allows this } return super.take(); } }
//查看TaskThreadFactory
/** * Simple task thread factory to use to create threads for an executor implementation. */ public class TaskThreadFactory implements ThreadFactory { private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; private final boolean daemon; private final int threadPriority; public TaskThreadFactory(String namePrefix, boolean daemon, int priority) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.namePrefix = namePrefix; this.daemon = daemon; this.threadPriority = priority; } @Override public Thread newThread(Runnable r) { TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(threadPriority); return t; } }
查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}
查看Poller线程的初始化和启动,
Poller类主要负责poll传入的socket连接(提供add(),由其他线程传入),如果发现有事件,交给AjpConnectionHandler处理。
// Start poller thread
poller = new Poller(); poller.init(); Thread pollerThread = new Thread(poller, getName() + "-Poller"); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start();
Poller是AprEndPointer的内部类
// ------------------------------------------------------ Poller Inner Class public class Poller implements Runnable { /** Pointers to the pollers.*/ protected long[] pollers = null; /** Actual poller size. */ protected int actualPollerSize = 0; /** Amount of spots left in the poller. */ protected int[] pollerSpace = null; /** Amount of low level pollers in use by this poller.*/ protected int pollerCount; /** Timeout value for the poll call. */ protected int pollerTime; /** * Variable poller timeout that adjusts depending on how many poll sets * are in use so that the total poll time across all poll sets remains * equal to pollTime. */ private int nextPollerTime; /** Root pool. */ protected long pool = 0; /** Socket descriptors.*/ protected long[] desc; /** List of sockets to be added to the poller.*/ protected SocketList addList = null; /** List of sockets to be closed.*/ private SocketList closeList = null; /** * Structure used for storing timeouts. */ protected SocketTimeouts timeouts = null; /** * Last run of maintain. Maintain will run approximately once every one * second (may be slightly longer between runs). */ protected long lastMaintain = System.currentTimeMillis(); /** * The number of connections currently inside this Poller. The correct * operation of the Poller depends on this figure being correct. If it * is not, it is possible that the Poller will enter a wait loop where * it waits for the next connection to be added to the Poller before it * calls poll when it should still be polling existing connections. * Although not necessary at the time of writing this comment, it has * been implemented as an AtomicInteger to ensure that it remains * thread-safe. */ private AtomicInteger connectionCount = new AtomicInteger(0); public int getConnectionCount() { return connectionCount.get(); } private volatile boolean pollerRunning = true; /** * Create the poller. With some versions of APR, the maximum poller size * will be 62 (recompiling APR is necessary to remove this limitation). */ //初始化Poller protected void init() { pool = Pool.create(serverSockPool); // Single poller by default int defaultPollerSize = getMaxConnections(); if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) { // The maximum per poller to get reasonable performance is 1024 // Adjust poller size so that it won't reach the limit. This is // a limitation of XP / Server 2003 that has been fixed in // Vista / Server 2008 onwards. actualPollerSize = 1024; } else { actualPollerSize = defaultPollerSize; } timeouts = new SocketTimeouts(defaultPollerSize); // At the moment, setting the timeout is useless, but it could get // used again as the normal poller could be faster using maintain. // It might not be worth bothering though. long pollset = allocatePoller(actualPollerSize, pool, -1); if (pollset == 0 && actualPollerSize > 1024) { actualPollerSize = 1024; pollset = allocatePoller(actualPollerSize, pool, -1); } if (pollset == 0) { actualPollerSize = 62; pollset = allocatePoller(actualPollerSize, pool, -1); } pollerCount = defaultPollerSize / actualPollerSize; pollerTime = pollTime / pollerCount; nextPollerTime = pollerTime; pollers = new long[pollerCount]; pollers[0] = pollset; for (int i = 1; i < pollerCount; i++) { pollers[i] = allocatePoller(actualPollerSize, pool, -1); } pollerSpace = new int[pollerCount]; for (int i = 0; i < pollerCount; i++) { pollerSpace[i] = actualPollerSize; } desc = new long[actualPollerSize * 2]; connectionCount.set(0); addList = new SocketList(defaultPollerSize); closeList = new SocketList(defaultPollerSize); } /** * The background thread that listens for incoming TCP/IP connections * and hands them off to an appropriate processor. */ //监听TCP/IP连接,并交给相应handler处理 @Override public void run() { // Poll for the specified interval for (int i = 0; i < pollers.length; i++) { for (int n = 0; n < rv; n++) { long timeout = timeouts.remove(desc[n*2+1]); AprSocketWrapper wrapper = connections.get( Long.valueOf(desc[n*2+1])); if (getLog().isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.pollerProcess", Long.valueOf(desc[n*2+1]), Long.valueOf(desc[n*2]))); } wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]); // Check for failed sockets and hand this socket off to a worker //处理请求 if (wrapper.isComet()) { // Event processes either a read or a write depending on what the poller returns //APR_POLLHUP if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } //APR_POLLIN } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { if (wrapper.pollerFlags != 0) { add(desc[n*2+1], 1, wrapper.pollerFlags); } //OPEN_READ if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } //APR_POLLOUT } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { if (wrapper.pollerFlags != 0) { add(desc[n*2+1], 1, wrapper.pollerFlags); } if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } } else { // Unknown event getLog().warn(sm.getString( "endpoint.apr.pollUnknownEvent", Long.valueOf(desc[n*2]))); if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } } } } synchronized (this) { this.notifyAll(); } } /** * Process given socket. Called in non-comet mode, typically keep alive * or upgraded protocol. */ //处理socket事件 public boolean processSocket(long socket, SocketStatus status) { try { Executor executor = getExecutor(); if (executor == null) { log.warn(sm.getString("endpoint.warn.noExector", Long.valueOf(socket), null)); } else { SocketWrapper<Long> wrapper = connections.get(Long.valueOf(socket)); // Make sure connection hasn't been closed if (wrapper != null) { //创建SocketProcessor处理线程,并执行 executor.execute(new SocketProcessor(wrapper, status)); } } } return true; } }
来看SocketProcessor
// -------------------------------------------- SocketProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { private final SocketWrapper<Long> socket; private final SocketStatus status; public SocketProcessor(SocketWrapper<Long> socket, SocketStatus status) { this.socket = socket; if (status == null) { // Should never happen throw new NullPointerException(); } this.status = status; } @Override public void run() { // Upgraded connections need to allow multiple threads to access the // connection at the same time to enable blocking IO to be used when // Servlet 3.1 NIO has been configured if (socket.isUpgraded() && SocketStatus.OPEN_WRITE == status) { synchronized (socket.getWriteThreadLock()) { doRun(); } } else { synchronized (socket) { doRun(); } } } private void doRun() { // Process the request from this socket if (socket.getSocket() == null) { // Closed in another thread return; } //handler处理请求 SocketState state = handler.process(socket, status); if (state == Handler.SocketState.CLOSED) { // Close socket and pool closeSocket(socket.getSocket().longValue()); socket.socket = null; } else if (state == Handler.SocketState.LONG) { socket.access(); if (socket.async) { waitingRequests.add(socket); } } else if (state == Handler.SocketState.ASYNC_END) { socket.access(); SocketProcessor proc = new SocketProcessor(socket, SocketStatus.OPEN_READ); getExecutor().execute(proc); } } }
根据handler.process(socket, status),我们去查看handler是如何处理请求的在Http11Protocol的
构造方法中可以看到handler就是Http11ConnectionHandler
public Http11Protocol() { //新建请求处理EndPoint为JIoEndpoint endpoint = new JIoEndpoint(); cHandler = new Http11ConnectionHandler(this); //设置endpoint的handler ((JIoEndpoint) endpoint).setHandler(cHandler); setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
查看Http11ConnectionHandler是Http11Protocol的内部类
// ----------------------------------- Http11ConnectionHandler Inner Class protected static class Http11ConnectionHandler extends AbstractConnectionHandler<Socket, Http11Processor> implements Handler { }
而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
// ------------------------------------------- Connection handler base class protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>> implements AbstractEndpoint.Handler { @SuppressWarnings("deprecation") // Old HTTP upgrade method has been deprecated //处理HTTP请求 public SocketState process(SocketWrapper<S> wrapper,SocketStatus status) { if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); if (socket == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } Processor<S> processor = connections.get(socket); if (status == SocketStatus.DISCONNECT && processor == null) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } wrapper.setAsync(false); ContainerThreadMarker.markAsContainerThread(); try { if (processor == null) { processor = recycledProcessors.poll(); } if (processor == null) { //创建处理器 processor = createProcessor(); } //初始化SSL,在子类中扩展 initSsl(wrapper, processor); SocketState state = SocketState.CLOSED; do { if (status == SocketStatus.CLOSE_NOW) { processor.errorDispatch(); state = SocketState.CLOSED; } else if (status == SocketStatus.DISCONNECT && !processor.isComet()) { // Do nothing here, just wait for it to get recycled // Don't do this for Comet we need to generate an end // event (see BZ 54022) } else if (processor.isAsync() || state == SocketState.ASYNC_END) { state = processor.asyncDispatch(status); if (state == SocketState.OPEN) { // release() won't get called so in case this request // takes a long time to process, remove the socket from // the waiting requests now else the async timeout will // fire getProtocol().endpoint.removeWaitingRequest(wrapper); // There may be pipe-lined data to read. If the data // isn't processed now, execution will exit this // loop and call release() which will recycle the // processor (and input buffer) deleting any // pipe-lined data. To avoid this, process it now. state = processor.process(wrapper); } } else if (processor.isComet()) { state = processor.event(status); } { state = processor.process(wrapper); } } //带子类扩展 protected abstract P createProcessor(); protected abstract void initSsl(SocketWrapper<S> socket, Processor<S> processor); protected abstract void longPoll(SocketWrapper<S> socket, Processor<S> processor); protected abstract void release(SocketWrapper<S> socket, Processor<S> processor, boolean socketClosing, boolean addToPoller); } 查看Http11ConnectionHandler,createProcessor @Override protected Http11AprProcessor createProcessor() { Http11AprProcessor processor = new Http11AprProcessor( proto.getMaxHttpHeaderSize(), (AprEndpoint)proto.endpoint, proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(), proto.getMaxExtensionSize(), proto.getMaxSwallowSize()); //设置处理器适配器,在connector构造函数中, //adapter = new CoyoteAdapter(this); //protocolHandler.setAdapter(adapter); //有这么两句 processor.setAdapter(proto.adapter); processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests()); processor.setKeepAliveTimeout(proto.getKeepAliveTimeout()); processor.setConnectionUploadTimeout( proto.getConnectionUploadTimeout()); processor.setDisableUploadTimeout(proto.getDisableUploadTimeout()); processor.setCompressionMinSize(proto.getCompressionMinSize()); processor.setCompression(proto.getCompression()); processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents()); processor.setCompressableMimeTypes(proto.getCompressableMimeTypes()); processor.setRestrictedUserAgents(proto.getRestrictedUserAgents()); processor.setSocketBuffer(proto.getSocketBuffer()); processor.setMaxSavePostSize(proto.getMaxSavePostSize()); processor.setServer(proto.getServer()); processor.setClientCertProvider(proto.getClientCertProvider()); register(processor); return processor; }
查看Http11AprProcessor
public class Http11AprProcessor extends AbstractHttp11Processor<Long> {}
无处理函数process(socket, status)
查看AbstractHttp11Processor
public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> { /** * Process pipelined HTTP requests using the specified input and output * streams. * @param socketWrapper Socket from which the HTTP requests will be read * and the HTTP responses will be written. * @throws IOException error during an I/O operation */ //处理HTTP请求 @Override public SocketState process(SocketWrapper<S> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the I/O setSocketWrapper(socketWrapper); getInputBuffer().init(socketWrapper, endpoint); getOutputBuffer().init(socketWrapper, endpoint); while (!getErrorState().isError() && keepAlive && !comet && !isAsync() && upgradeInbound == null && httpUpgradeHandler == null && !endpoint.isPaused()) { // Parsing the request header try { setRequestLineReadTimeout(); //503 if (endpoint.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); // Currently only NIO will ever return false here if (!getInputBuffer().parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { setSocketTimeout(connectionUploadTimeout); } } catch (Throwable t) { // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } if (!getErrorState().isError()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { //请求预处理 prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } //503,400,500,看到这些数字想到了什么? // Process the request in the adapter //适配器,处理request, response if (!getErrorState().isError()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); //这个就是关键了,service函数,这个adapter,上面已经讲过 //实际上是CoyoteAdapter adapter.service(request, response); } /** * After reading the request headers, we have to setup the request filters. */ protected void prepareRequest() { MimeHeaders headers = request.getMimeHeaders(); // Check for a full URI (including protocol://host:port/) ByteChunk uriBC = request.requestURI().getByteChunk(); if (uriBC.startsWithIgnoreCase("http", 0)) { int pos = uriBC.indexOf("://", 0, 3, 4); int uriBCStart = uriBC.getStart(); int slashPos = -1; if (pos != -1) { byte[] uriB = uriBC.getBytes(); slashPos = uriBC.indexOf('/', pos + 3); if (slashPos == -1) { slashPos = uriBC.getLength(); // Set URI as "/" request.requestURI().setBytes (uriB, uriBCStart + pos + 1, 1); } else { request.requestURI().setBytes (uriB, uriBCStart + slashPos, uriBC.getLength() - slashPos); } MessageBytes hostMB = headers.setValue("host"); hostMB.setBytes(uriB, uriBCStart + pos + 3, slashPos - pos - 3); } } } }
从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
public class CoyoteAdapter implements Adapter { /** * Construct a new CoyoteProcessor associated with the specified connector. * * @param connector CoyoteConnector that owns this processor */ //一个Adapter关联一个Connector public CoyoteAdapter(Connector connector) { super(); this.connector = connector; } private Connector connector = null; /** * Encoder for the Location URL in HTTP redirects. */ protected static URLEncoder urlEncoder; //url安全字符集 static { urlEncoder = new URLEncoder(); urlEncoder.addSafeCharacter('-'); urlEncoder.addSafeCharacter('_'); urlEncoder.addSafeCharacter('.'); urlEncoder.addSafeCharacter('*'); urlEncoder.addSafeCharacter('/'); } ** * Service method. */ @Override public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); try { // Parse and set Catalina and configuration specific // request parameters req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName()); postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container //这个就是我们一直想要找的,Servlet处理request connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); } }
前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
从上面可以看出Poller,调用本地方创建socket监听线程,当监听到连接时,创一个SocketProcessor,即socket处理线程SOCKET处理线程,委托给Http11ConnectionHandler(Http11ConnectionHandler是Http11Protocol的内部类)处理,
Http11ConnectionHandler继承AbstractConnectionHandler,AbstractConnectionHandler处理请求是获取precessor,precessor从Http11ConnectionHandler获取,实际为Http11AprProcessor,Http11AprProcessor继承AbstractHttp11Processor,并处理请求在process(SocketWrapper<S> socketWrapper)中,在这个方法中处理请求头部,再讲实际业务逻辑处理委托CoyoteAdapter的service方法,service方法通过反射调用对应的servlet的service方法来处理逻辑。
在这里,简单总结一下,每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),
而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。
上面一部分的堆栈有点深,现在退出来,回到AprEndpoint的startInternal看
startAcceptorThreads();这句话
这个方法在AbstractEndPoint中
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { //创建 acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } } /** * Hook to allow Endpoints to provide a specific Acceptor implementation. */ //委托给子类扩展 protected abstract Acceptor createAcceptor();
//AbstractEndPoint的抽象静态内部类Acceptor,描述Socket请求处理状态
public abstract static class Acceptor implements Runnable { public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED } protected volatile AcceptorState state = AcceptorState.NEW; public final AcceptorState getState() { return state; } private String threadName; protected final void setThreadName(final String threadName) { this.threadName = threadName; } protected final String getThreadName() { return threadName; } }
查看AprEndpoint的createAcceptor方法
@Override protected AbstractEndpoint.Acceptor createAcceptor() { return new Acceptor(); }
总结Connector
每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理Request的Servlet包装类;同时每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。Connector初始化最主要的是,初始化Http11Protocol,实际初始化Endpoint,EndPoint初始化bind,SOCKET地址,并监听请求及初始化SSL,Endpoint启动,即初始化Poller(处理socket请求)。
下面一篇文章我们来看看,JioEndPoint。
//ContainerThreadMarker
public class ContainerThreadMarker { private static final ThreadLocal<Boolean> marker = new ThreadLocal<Boolean>(); public static boolean isContainerThread() { Boolean flag = marker.get(); if (flag == null) { return false; } else { return flag.booleanValue(); } } public static void markAsContainerThread() { marker.set(Boolean.TRUE); } }
//Pool
public class Pool { /** * Create a new pool. * @param parent The parent pool. If this is 0, the new pool is a root * pool. If it is non-zero, the new pool will inherit all * of its parent pool's attributes, except the apr_pool_t will * be a sub-pool. * @return The pool we have just created. */ public static native long create(long parent); /** * Clear all memory in the pool and run all the cleanups. This also destroys all * subpools. * @param pool The pool to clear * This does not actually free the memory, it just allows the pool * to re-use this memory for the next allocation. */ public static native void clear(long pool); /** * Destroy the pool. This takes similar action as apr_pool_clear() and then * frees all the memory. * This will actually free the memory * @param pool The pool to destroy */ public static native void destroy(long pool); /** * Get the parent pool of the specified pool. * @param pool The pool for retrieving the parent pool. * @return The parent of the given pool. */ public static native long parentGet(long pool); /** * Determine if pool a is an ancestor of pool b * @param a The pool to search * @param b The pool to search for * @return True if a is an ancestor of b, NULL is considered an ancestor * of all pools. */ public static native boolean isAncestor(long a, long b); /* * Cleanup * * Cleanups are performed in the reverse order they were registered. That is: * Last In, First Out. A cleanup function can safely allocate memory from * the pool that is being cleaned up. It can also safely register additional * cleanups which will be run LIFO, directly after the current cleanup * terminates. Cleanups have to take caution in calling functions that * create subpools. Subpools, created during cleanup will NOT automatically * be cleaned up. In other words, cleanups are to clean up after themselves. */ /** * Register a function to be called when a pool is cleared or destroyed * @param pool The pool register the cleanup with * @param o The object to call when the pool is cleared * or destroyed * @return The cleanup handler. */ public static native long cleanupRegister(long pool, Object o); /** * Remove a previously registered cleanup function * @param pool The pool remove the cleanup from * @param data The cleanup handler to remove from cleanup */ public static native void cleanupKill(long pool, long data); /** * Register a process to be killed when a pool dies. * @param a The pool to use to define the processes lifetime * @param proc The process to register * @param how How to kill the process, one of: * <PRE> * APR_KILL_NEVER -- process is never sent any signals * APR_KILL_ALWAYS -- process is sent SIGKILL on apr_pool_t cleanup * APR_KILL_AFTER_TIMEOUT -- SIGTERM, wait 3 seconds, SIGKILL * APR_JUST_WAIT -- wait forever for the process to complete * APR_KILL_ONLY_ONCE -- send SIGTERM and then wait * </PRE> */ public static native void noteSubprocess(long a, long proc, int how); /** * Allocate a block of memory from a pool * @param p The pool to allocate from * @param size The amount of memory to allocate * @return The ByteBuffer with allocated memory */ public static native ByteBuffer alloc(long p, int size); /** * Allocate a block of memory from a pool and set all of the memory to 0 * @param p The pool to allocate from * @param size The amount of memory to allocate * @return The ByteBuffer with allocated memory */ public static native ByteBuffer calloc(long p, int size); /* * User data management */ /** * Set the data associated with the current pool * @param data The user data associated with the pool. * @param key The key to use for association * @param pool The current pool * <br><b>Warning :</b> * The data to be attached to the pool should have a life span * at least as long as the pool it is being attached to. * Object attached to the pool will be globally referenced * until the pool is cleared or dataSet is called with the null data. * @return APR Status code. */ public static native int dataSet(long pool, String key, Object data); /** * Return the data associated with the current pool. * @param key The key for the data to retrieve * @param pool The current pool. */ public static native Object dataGet(long pool, String key); /** * Run all of the child_cleanups, so that any unnecessary files are * closed because we are about to exec a new program */ public static native void cleanupForExec(); }
//SocketWrapper
public class SocketWrapper<E> { protected volatile E socket; // Volatile because I/O and setting the timeout values occurs on a different // thread to the thread checking the timeout. protected volatile long lastAccess = System.currentTimeMillis(); protected volatile long timeout = -1; protected boolean error = false; protected long lastRegistered = 0; protected volatile int keepAliveLeft = 100; private boolean comet = false; protected boolean async = false; protected boolean keptAlive = false; private boolean upgraded = false; private boolean secure = false; /* * Used if block/non-blocking is set at the socket level. The client is * responsible for the thread-safe use of this field via the locks provided. */ private volatile boolean blockingStatus = true; private final Lock blockingStatusReadLock; private final WriteLock blockingStatusWriteLock; /* * In normal servlet processing only one thread is allowed to access the * socket at a time. That is controlled by a lock on the socket for both * read and writes). When HTTP upgrade is used, one read thread and one * write thread are allowed to access the socket concurrently. In this case * the lock on the socket is used for reads and the lock below is used for * writes. */ private final Object writeThreadLock = new Object(); public SocketWrapper(E socket) { this.socket = socket; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.blockingStatusReadLock = lock.readLock(); this.blockingStatusWriteLock =lock.writeLock(); } public E getSocket() { return socket; } public boolean isComet() { return comet; } public void setComet(boolean comet) { this.comet = comet; } public boolean isAsync() { return async; } public void setAsync(boolean async) { this.async = async; } public boolean isUpgraded() { return upgraded; } public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; } public boolean isSecure() { return secure; } public void setSecure(boolean secure) { this.secure = secure; } public long getLastAccess() { return lastAccess; } public void access() { // Async timeouts are based on the time between the call to startAsync() // and complete() / dispatch() so don't update the last access time // (that drives the timeout) on every read and write when using async // processing. if (!isAsync()) { access(System.currentTimeMillis()); } } public void access(long access) { lastAccess = access; } public void setTimeout(long timeout) {this.timeout = timeout;} public long getTimeout() {return this.timeout;} public boolean getError() { return error; } public void setError(boolean error) { this.error = error; } public void setKeepAliveLeft(int keepAliveLeft) { this.keepAliveLeft = keepAliveLeft;} public int decrementKeepAlive() { return (--keepAliveLeft);} public boolean isKeptAlive() {return keptAlive;} public void setKeptAlive(boolean keptAlive) {this.keptAlive = keptAlive;} public boolean getBlockingStatus() { return blockingStatus; } public void setBlockingStatus(boolean blockingStatus) { this.blockingStatus = blockingStatus; } public Lock getBlockingStatusReadLock() { return blockingStatusReadLock; } public WriteLock getBlockingStatusWriteLock() { return blockingStatusWriteLock; } public Object getWriteThreadLock() { return writeThreadLock; } public void reset(E socket, long timeout) { async = false; blockingStatus = true; comet = false; error = false; keepAliveLeft = 100; lastAccess = System.currentTimeMillis(); this.socket = socket; this.timeout = timeout; upgraded = false; } /** * Overridden for debug purposes. No guarantees are made about the format of * this message which may vary significantly between point releases. * <p> * {@inheritDoc} */ @Override public String toString() { return super.toString() + ":" + String.valueOf(socket); } }
上一篇: 报表求大虾优化或解决方案