欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程

程序员文章站 2022-05-06 14:37:44
...
Tomcat的Server初始化及启动过程:http://donald-draper.iteye.com/blog/2327060
Tomcat的connector:http://hill007299.iteye.com/blog/1757198
阻塞队列--LinkedBlockingQueue:http://www.cnblogs.com/linjiqin/p/5128048.html
Socket处理(2) - AprEndpoint:http://blog.sina.com.cn/s/blog_53b51a980100qejl.html
在Server的启动过程中,我们看到了Connectors的初始化与启动,那时我们没有深入分析
,今天我们来看一下:
public class StandardService extends LifecycleMBeanBase implements Service {
    private static final String info =
        "org.apache.catalina.core.StandardService/1.0";
    private String name = null;
    private static final StringManager sm =
        StringManager.getManager(Constants.Package);
    private Server server = null;
    protected Connector connectors[] = new Connector[0];
    private final Object connectorsLock = new Object();
    protected ArrayList<Executor> executors = new ArrayList<Executor>();
    protected Container container = null;
    private ClassLoader parentClassLoader = null;

    protected void initInternal() throws LifecycleException {
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
		    //初始化Connector
                    connector.init();
                } 
            }
        }
    }
    protected void startInternal() throws LifecycleException {
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                try {
                    if (connector.getState() != LifecycleState.FAILED) {
		        //启动connector
                        connector.start();
                    }
                } 
            }
        }
    }
}

//Connector
public class Connector extends LifecycleMBeanBase  {
   //关联Service
   protected Service service = null;
   /** The port number on which we listen for requests.*/
    protected int port = -1;
    protected String proxyName = null;
    protected int proxyPort = 0;
    /** The redirect port for non-SSL to SSL redirects. */
    protected int redirectPort = 443;
    protected String scheme = "http";
    //Get or Post 允许最大请求参数个数
    protected int maxParameterCount = 10000;
    //参数解析最大size,默认2MB
    protected int maxPostSize = 2 * 1024 * 1024;
    protected int maxSavePostSize = 4 * 1024;
    //需要解析Body的方法
    protected String parseBodyMethods = "POST";
    //协议处理handler Class
    protected String protocolHandlerClassName ="org.apache.coyote.http11.Http11Protocol";
    /**Coyote protocol handler.*/
    protected ProtocolHandler protocolHandler = null;
    /** Coyote adapter.*/
    protected Adapter adapter = null;
    //HTTP,Servlet API
    protected Mapper mapper = new Mapper();
    //MapperListener
    protected MapperListener mapperListener = new MapperListener(mapper, this);
    protected String URIEncoding = null;
    protected boolean useBodyEncodingForURI = false;

 //构造函数
 public Connector(String protocol) {
        setProtocol(protocol);
        // Instantiate protocol handler
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            this.protocolHandler = (ProtocolHandler) clazz.newInstance();
        } 
    }
   //初始化
    protected void initInternal() throws LifecycleException {
        super.initInternal();
	//新建请求处理器代理
        adapter = new CoyoteAdapter(this);
	//设置协议处理器的Adapter
        protocolHandler.setAdapter(adapter);
        // Make sure parseBodyMethodsSet has a default
        if( null == parseBodyMethodsSet ) {
            setParseBodyMethods(getParseBodyMethods());
        }
        if (protocolHandler.isAprRequired() &&
                !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerNoApr",
                            getProtocolHandlerClassName()));
        }
        try {
	    //初始化协议处理器
            protocolHandler.init();
        } 
        // Initialize mapper listener
	//初始化,注册到JMX,实际调用的是,LifecycleMBeanBase.initInternal
        mapperListener.init();
    }
}

//Http11Protocol
/**
 * Abstract the protocol implementation, including threading, etc.
 * Processor is single threaded and specific to stream-based protocols,
 * will not fit Jk protocols like JNI.
 */
public class Http11Protocol extends AbstractHttp11JsseProtocol<Socket> {

  protected Http11ConnectionHandler cHandler;
  private int disableKeepAlivePercentage = 75;
  public Http11Protocol() {
        //新建请求处理EndPoint为JIoEndpoint
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
	//设置endpoint的handler
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }
}

查看Http11Protocol协议 ,Http11Protocol无init方法,再查看
AbstractHttp11JsseProtocol
public abstract class AbstractHttp11JsseProtocol<S>
        extends AbstractHttp11Protocol<S> {
   protected SSLImplementation sslImplementation = null;
    // ------------------------------------------------------- Lifecycle methods
    @Override
    public void init() throws Exception {
        // SSL implementation needs to be in place before end point is
        // initialized
	//初始化SSLImplementation
        sslImplementation = SSLImplementation.getInstance(sslImplementationName);
        super.init();
    }
}

//SSLImplementation
public abstract class SSLImplementation {
    private static final String[] implementations = { JSSEImplementationClass };
    //获取SSL实现类
    public static SSLImplementation getInstance(String className)
            throws ClassNotFoundException {
        if (className == null)
            return getInstance();

        try {
            // Workaround for the J2SE 1.4.x classloading problem (under
            // Solaris).
            // Class.forName(..) fails without creating class using new.
            // This is an ugly workaround.
            if (JSSEImplementationClass.equals(className)) {
                return new org.apache.tomcat.util.net.jsse.JSSEImplementation();
            }
            Class<?> clazz = Class.forName(className);
            return (SSLImplementation) clazz.newInstance();
        }
    }
     public static SSLImplementation getInstance() throws ClassNotFoundException {
        for (int i = 0; i < implementations.length; i++) {
            try {
                SSLImplementation impl = getInstance(implementations[i]);
                return impl;
            } 
        }
    }
    }

分析AbstractHttp11JsseProtocol初始化,首先获取SSLImplementation实例
,调用父类的init
查看AbstractHttp11Protocol,无初始化方法
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
    @Override
    protected String getProtocolName() {
        return "Http";
    }
}

查看AbstractProtocol
public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {
    //Name of MBean for the Global Request Processor.
    protected ObjectName rgOname = null;
    /** Name of MBean for the ThreadPool.*/
    protected ObjectName tpOname = null;
    /**
     * Unique ID for this connector. Only used if the connector is configured
     * to use a random port as the port will change if stop(), start() is
     * called.
     */
    private int nameIndex = 0;
    /**
     * Endpoint that provides low-level network I/O - must be matched to the
     * ProtocolHandler implementation (ProtocolHandler using BIO, requires BIO
     * Endpoint etc.).
     */
    protected AbstractEndpoint<S> endpoint = null;
    /**
     * The adapter provides the link between the ProtocolHandler and the
     * connector.
     */
    protected Adapter adapter;
    // ------------------------------------------------------- Lifecycle methods

    /*
     * NOTE: There is no maintenance of state or checking for valid transitions
     * within this class. It is expected that the connector will maintain state
     * and prevent invalid state transitions.
     */

    @Override
    public void init() throws Exception {
        //初始化Global Request Processor Mbean,并注册到JMX
        if (oname == null) {
            // Component not pre-registered so register it
            oname = createObjectName();
            if (oname != null) {
                Registry.getRegistry(null, null).registerComponent(this, oname,
                    null);
            }
        }
        //初始化MBean for the ThreadPool,并注册到JMX
        if (this.domain != null) {
            try {
                tpOname = new ObjectName(domain + ":" +
                        "type=ThreadPool,name=" + getName());
                Registry.getRegistry(null, null).registerComponent(endpoint,
                        tpOname, null);
               } 
            }
            rgOname=new ObjectName(domain +
                    ":type=GlobalRequestProcessor,name=" + getName());
            Registry.getRegistry(null, null).registerComponent(
                    getHandler().getGlobal(), rgOname, null );
        }
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        try {
	    //网络IO Handler初始化
            endpoint.init();
        }
    }
    //启动
    @Override
    public void start() throws Exception {
        try {
	 //启动endPoint
            endpoint.start();
        } 
    }
}

AbstractProtocol初始化最重要的一点是网络IO Handler初始化,下面
我们来看AbstractEndpoint的初始化:
public abstract class AbstractEndpoint<S> {
public static interface Handler {
        /**
         * Different types of socket states to react upon.
         */
        public enum SocketState {
            // TODO Add a new state to the AsyncStateMachine and remove
            //      ASYNC_END (if possible)
            OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING_TOMCAT,
            UPGRADING, UPGRADED
        }
 }
 //绑定状态枚举
   protected enum BindState {
        UNBOUND, BOUND_ON_INIT, BOUND_ON_START
    }
   //http请求接受器Acceptor
    public abstract static class Acceptor implements Runnable {
        public enum AcceptorState {
            NEW, RUNNING, PAUSED, ENDED
        }

        protected volatile AcceptorState state = AcceptorState.NEW;
        public final AcceptorState getState() {
            return state;
        }
}
    /**
     * Threads used to accept new connections and pass them to worker threads.
     */
    protected Acceptor[] acceptors;
    /**
     * External Executor based thread pool.
     */
    private Executor executor = null;
    /**
     * Server socket port.
     */
    private int port;
    /**
     * Address for the server socket.
     */
    private InetAddress address;
    /**
     * Controls when the Endpoint binds the port. <code>true</code>, the default
     * binds the port on {@link #init()} and unbinds it on {@link #destroy()}.
     * If set to <code>false</code> the port is bound on {@link #start()} and
     * unbound on {@link #stop()}.
     */
    private boolean bindOnInit = true;
     private BindState bindState = BindState.UNBOUND;

    /**
     * Keepalive timeout, if not set the soTimeout is used.
     */
    private Integer keepAliveTimeout = null;
    /**
     * SSL engine.
     */
    private boolean SSLEnabled = false;
    /**
     * Socket timeout.
     */
    public int getSoTimeout() { return socketProperties.getSoTimeout(); }
    /**
     * The default is true - the created threads will be
     *  in daemon mode. If set to false, the control thread
     *  will not be daemon - and will keep the process alive.
     */
    private boolean daemon = true;
    
    /**
     * Priority of the worker threads.
     */
    protected int threadPriority = Thread.NORM_PRIORITY;
    /*
     * NOTE: There is no maintenance of state or checking for valid transitions
     * within this class other than ensuring that bind/unbind are called in the
     * right place. It is expected that the calling code will maintain state and
     * prevent invalid state transitions.
     */
    //看到这几个函数,是否有种似曾相识的感觉
    public abstract void bind() throws Exception;
    public abstract void unbind() throws Exception;
    public abstract void startInternal() throws Exception;
    public abstract void stopInternal() throws Exception;
    //初始化
    public final void init() throws Exception {
        testServerCipherSuitesOrderSupport();
	//bindOnInit默认为true,
        if (bindOnInit) {
	    //而bind为抽象函数,待子类扩展
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
    }
    //启动
     public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
	 //而startInternal为抽象函数,待子类扩展
        startInternal();
    }
}

而AbstractEndpoint有JIoEndpoint,NioEndpoint,AprEndpoint子类实现
在serverx.xml中有这么一段:
<!-- A "Connector" represents an endpoint by which requests are received
         and responses are returned. Documentation at :
         Java HTTP Connector: /docs/config/http.html (blocking & non-blocking)
         Java AJP  Connector: /docs/config/ajp.html
         APR (HTTP/AJP) Connector: /docs/apr.html
         Define a non-SSL HTTP/1.1 Connector on port 8080
    -->
    <Connector port="8080" protocol="HTTP/1.1" 
               connectionTimeout="20000" 
               redirectPort="8443" />

JIoEndpoint,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
NioEndpoint,使用java nio实现,使用反应器模式,线程和连接解绑,多路复用。
AprEndpoint,使用Apache Portable Runtime实现,直接调用native方法,有更高的效率,但是实现依赖具体平台。
调试追踪Tomcat7.0.70发现如下图:

Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程
            
    
    博客分类: Tomcat tomcatcometsocketthreadservlet 

这里我们来看一下AprEndpoint
/**
 * APR tailored thread pool, providing the following services:
 * [list]
 * [*]Socket acceptor thread

 * [*]Socket poller thread

 * [*]Sendfile thread

 * [*]Worker threads pool

 * [/list]
 */
public class AprEndpoint extends AbstractEndpoint<Long> {
    protected static final Set<String> SSL_PROTO_ALL = new HashSet<String>();
    static {
        /* Default used if SSLProtocol is not configured, also
           used if SSLProtocol="All" */
        SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1);
        SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1_1);
        SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1_2);
    }
    /**
     * Root APR memory pool.
     */
    protected long rootPool = 0;


    /**
     * Server socket "pointer".
     */
    protected long serverSock = 0;


    /**
     * APR memory pool for the server socket.
     */
    protected long serverSockPool = 0;


    /**
     * SSL context.
     */
    protected long sslContext = 0;
    private final Map<Long,AprSocketWrapper> connections =
            new ConcurrentHashMap<Long, AprSocketWrapper>();
    // ------------------------------------------------------------ Constructor

    public AprEndpoint() {
        // Need to override the default for maxConnections to align it with what
        // was pollerSize (before the two were merged)
        setMaxConnections(8 * 1024);
    }
    private static class AprSocketWrapper extends SocketWrapper<Long> {

        // This field should only be used by Poller#run()
        private int pollerFlags = 0;

        public AprSocketWrapper(Long socket) {
            super(socket);
        }
    }
    /**
     * Handling of accepted sockets.
     */
    protected Handler handler = null;
     /**
     * Poll interval, in microseconds. The smaller the value, the more CPU the poller
     * will use, but the more responsive to activity it will be.
     */
    protected int pollTime = 2000;
    /**
     * Allow comet request handling.
     */
    protected boolean useComet = true;
    /**
     * The socket poller.
     */
    protected Poller poller = null;
    /**
     * SSL protocols.
     */
    protected String SSLProtocol = "all";
    /**
     * SSL password (if a cert is encrypted, and no password has been provided, a callback
     * will ask for a password).
     */
    protected String SSLPassword = null;
    /**
     * SSL certificate file.
     */
    protected String SSLCertificateFile = null;
    /**
     * SSL certificate key file.
     */
    protected String SSLCertificateKeyFile = null;
     /**
     * SSL CA certificate path.
     */
    protected String SSLCACertificatePath = null;
     /**
     * SSL CA certificate file.
     */
    protected String SSLCACertificateFile = null;
   //初始化EndPoint,所有关键工作都在这里
    @Override
    public void bind() throws Exception {
    // Create the root APR memory pool
    try {
          //调用本地方法,创建apr内存池
          rootPool = Pool.create(0);
     }
     // Create the pool for the server socket
        serverSockPool = Pool.create(rootPool);
        // Create the APR address that will be bound
        String addressStr = null;
        if (getAddress() != null) {
            addressStr = getAddress().getHostAddress();
        }
        int family = Socket.APR_INET;
        if (Library.APR_HAVE_IPV6) {
            if (addressStr == null) {
                if (!OS.IS_BSD && !OS.IS_WIN32 && !OS.IS_WIN64)
                    family = Socket.APR_UNSPEC;
            } else if (addressStr.indexOf(':') >= 0) {
                family = Socket.APR_UNSPEC;
            }
         }
        long inetAddress = Address.info(addressStr, family,
                getPort(), 0, rootPool);
        // Create the APR server socket
        serverSock = Socket.create(Address.getInfo(inetAddress).family,
                Socket.SOCK_STREAM,
                Socket.APR_PROTO_TCP, rootPool);
        if (OS.IS_UNIX) {
            Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
        }
        // Deal with the firewalls that tend to drop the inactive sockets
        Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
        //server socket地址绑定
        int ret = Socket.bind(serverSock, inetAddress);
        // Start listening on the server socket
        ret = Socket.listen(serverSock, getBacklog());
        if (OS.IS_WIN32 || OS.IS_WIN64) {
            // On Windows set the reuseaddr flag after the bind/listen
            Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
        }
        // Sendfile usage on systems which don't support it cause major problems
        if (useSendfile && !Library.APR_HAS_SENDFILE) {
            useSendfile = false;
        }
        // Initialize thread count default for acceptor
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        // Delay accepting of new connections until data is available
        // Only Linux kernels 2.4 + have that implemented
        // on other platforms this call is noop and will return APR_ENOTIMPL.
        if (deferAccept) {
            if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) {
                deferAccept = false;
            }
        }

        //如果SSL开启,这SSL上下文
        if (isSSLEnabled()) {
            if (SSLCertificateFile == null) {
                // This is required
                throw new Exception(sm.getString("endpoint.apr.noSslCertFile"));
            }
            // SSL protocol
            int value = SSL.SSL_PROTOCOL_NONE;
            if (SSLProtocol == null || SSLProtocol.length() == 0) {
                value = SSL.SSL_PROTOCOL_ALL;
            } else {

                Set<String> protocols = new HashSet<String>();

                // List of protocol names, separated by "+" or "-".
                // Semantics is adding ("+") or removing ("-") from left
                // to right, starting with an empty protocol set.
                // Tokens are individual protocol names or "all" for a
                // default set of supported protocols.

                // Split using a positive lookahead to keep the separator in
                // the capture so we can check which case it is.
                for (String protocol : SSLProtocol.split("(?=[-+])")) {
                    String trimmed = protocol.trim();
                    // Ignore token which only consists of prefix character
                    if (trimmed.length() > 1) {
                        if (trimmed.charAt(0) == '-') {
                            trimmed = trimmed.substring(1).trim();
                            if (trimmed.equalsIgnoreCase(Constants.SSL_PROTO_ALL)) {
                                protocols.removeAll(SSL_PROTO_ALL);
                            } else {
                                protocols.remove(trimmed);
                            }
                        } else {
                            if (trimmed.charAt(0) == '+') {
                                trimmed = trimmed.substring(1).trim();
                            }
                            if (trimmed.equalsIgnoreCase(Constants.SSL_PROTO_ALL)) {
                                protocols.addAll(SSL_PROTO_ALL);
                            } else {
                                protocols.add(trimmed);
                            }
                        }
                    }
                }
                for (String protocol : protocols) {
                    if (Constants.SSL_PROTO_SSLv2.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_SSLV2;
                    } else if (Constants.SSL_PROTO_SSLv3.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_SSLV3;
                    } else if (Constants.SSL_PROTO_TLSv1.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_TLSV1;
                    } else if (Constants.SSL_PROTO_TLSv1_1.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_TLSV1_1;
                    } else if (Constants.SSL_PROTO_TLSv1_2.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_TLSV1_2;
                    } else {
                        // Protocol not recognized, fail to start as it is safer than
                        // continuing with the default which might enable more than the
                        // is required
                        throw new Exception(sm.getString(
                                "endpoint.apr.invalidSslProtocol", SSLProtocol));
                    }
                }
            }

            // Create SSL Context
            try {
                sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
            }
            if (SSLInsecureRenegotiation) {
                boolean legacyRenegSupported = false;
                try {
                    legacyRenegSupported = SSL.hasOp(SSL.SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION);
                    if (legacyRenegSupported)
                        SSLContext.setOptions(sslContext, SSL.SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION);
                } 
            }

            // Set cipher order: client (default) or server
            if (SSLHonorCipherOrder) {
                boolean orderCiphersSupported = false;
                try {
                    orderCiphersSupported = SSL.hasOp(SSL.SSL_OP_CIPHER_SERVER_PREFERENCE);
                    if (orderCiphersSupported)
                        SSLContext.setOptions(sslContext, SSL.SSL_OP_CIPHER_SERVER_PREFERENCE);
                } 
            }
            // Disable compression if requested
            if (SSLDisableCompression) {
                boolean disableCompressionSupported = false;
                try {
                    disableCompressionSupported = SSL.hasOp(SSL.SSL_OP_NO_COMPRESSION);
                    if (disableCompressionSupported)
                        SSLContext.setOptions(sslContext, SSL.SSL_OP_NO_COMPRESSION);
                } 
            }

            // List the ciphers that the client is permitted to negotiate
            SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
            // Load Server key and certificate
            SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
            // Set certificate chain file
            SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);
            // Support Client Certificates
            SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);
            // Set revocation
            SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
            // Client certificate verification
            value = SSL.SSL_CVERIFY_NONE;
            if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
                value = SSL.SSL_CVERIFY_OPTIONAL;
            } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
                value = SSL.SSL_CVERIFY_REQUIRE;
            } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
                value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
            }
            SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
            // For now, sendfile is not supported with SSL
            useSendfile = false;
        }
    }
    /**
     * Start the APR endpoint, creating acceptor, poller and sendfile threads.
     */
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (getExecutor() == null) {
	       //AbstractEndpoint创建线程执行器,
                createExecutor();
            }
            //初始化连接共享锁
            initializeConnectionLatch();

            // Start poller thread
            //bind监听端口地址,并监听处理HTTP(TCP)请求,初始化SSL
            poller = new Poller();
            poller.init();
            Thread pollerThread = new Thread(poller, getName() + "-Poller");
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();

            // Start sendfile thread
            if (useSendfile) {
                sendfile = new Sendfile();
                sendfile.init();
                Thread sendfileThread =
                        new Thread(sendfile, getName() + "-Sendfile");
                sendfileThread.setPriority(threadPriority);
                sendfileThread.setDaemon(true);
                sendfileThread.start();
            }
	    
            startAcceptorThreads();
            // Start async timeout thread
            asyncTimeout = new AsyncTimeout();
            Thread timeoutThread = new Thread(asyncTimeout,
                    getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
 }

查看AbstractEndpoint
    //创建线程执行器
   
public void createExecutor() {
        internalExecutor = true;
	//新建任务队列
        TaskQueue taskqueue = new TaskQueue();
	//新建任务线程工程
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        //初始化线程执行器
	executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

//查看TaskQueue
/**
 * As task queue specifically designed to run with a thread pool executor.
 * The task queue is optimised to properly utilize threads within
 * a thread pool executor. If you use a normal queue, the executor will spawn threads
 * when there are idle threads and you wont be able to force items unto the queue itself
 *
 */
TaskQueue继承了LinkedBlockingQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private ThreadPoolExecutor parent = null;
    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;
    @Override
    //如果可能的话,将Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }
    @Override
    //获取并移除此队列的头Runnable,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    //获取BlockingQueue里排在首位的对象Runnable,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }
 }

//查看TaskThreadFactory
/**
 * Simple task thread factory to use to create threads for an executor implementation.
 */
public class TaskThreadFactory implements ThreadFactory {
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final boolean daemon;
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }
}

查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}
查看Poller线程的初始化和启动,
Poller类主要负责poll传入的socket连接(提供add(),由其他线程传入),如果发现有事件,交给AjpConnectionHandler处理。
// Start poller thread
poller = new Poller();
poller.init();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();

Poller是AprEndPointer的内部类
// ------------------------------------------------------ Poller Inner Class
   public class Poller implements Runnable {
        /** Pointers to the pollers.*/
        protected long[] pollers = null;
        /** Actual poller size. */
        protected int actualPollerSize = 0;
        /** Amount of spots left in the poller. */
        protected int[] pollerSpace = null;
        /** Amount of low level pollers in use by this poller.*/
        protected int pollerCount;
        /** Timeout value for the poll call. */
        protected int pollerTime;
        /**
         * Variable poller timeout that adjusts depending on how many poll sets
         * are in use so that the total poll time across all poll sets remains
         * equal to pollTime.
         */
        private int nextPollerTime;
        /** Root pool. */
        protected long pool = 0;
        /** Socket descriptors.*/
        protected long[] desc;
        /** List of sockets to be added to the poller.*/
        protected SocketList addList = null; 
        /** List of sockets to be closed.*/
        private SocketList closeList = null;
        /**
         * Structure used for storing timeouts.
         */
        protected SocketTimeouts timeouts = null;
        /**
         * Last run of maintain. Maintain will run approximately once every one
         * second (may be slightly longer between runs).
         */
        protected long lastMaintain = System.currentTimeMillis();
        /**
         * The number of connections currently inside this Poller. The correct
         * operation of the Poller depends on this figure being correct. If it
         * is not, it is possible that the Poller will enter a wait loop where
         * it waits for the next connection to be added to the Poller before it
         * calls poll when it should still be polling existing connections.
         * Although not necessary at the time of writing this comment, it has
         * been implemented as an AtomicInteger to ensure that it remains
         * thread-safe.
         */
        private AtomicInteger connectionCount = new AtomicInteger(0);
        public int getConnectionCount() { return connectionCount.get(); }
        private volatile boolean pollerRunning = true;
        /**
         * Create the poller. With some versions of APR, the maximum poller size
         * will be 62 (recompiling APR is necessary to remove this limitation).
         */
	 //初始化Poller
        protected void init() {
            pool = Pool.create(serverSockPool);
            // Single poller by default
            int defaultPollerSize = getMaxConnections();
            if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) {
                // The maximum per poller to get reasonable performance is 1024
                // Adjust poller size so that it won't reach the limit. This is
                // a limitation of XP / Server 2003 that has been fixed in
                // Vista / Server 2008 onwards.
                actualPollerSize = 1024;
            } else {
                actualPollerSize = defaultPollerSize;
            }

            timeouts = new SocketTimeouts(defaultPollerSize);

            // At the moment, setting the timeout is useless, but it could get
            // used again as the normal poller could be faster using maintain.
            // It might not be worth bothering though.
            long pollset = allocatePoller(actualPollerSize, pool, -1);
            if (pollset == 0 && actualPollerSize > 1024) {
                actualPollerSize = 1024;
                pollset = allocatePoller(actualPollerSize, pool, -1);
            }
            if (pollset == 0) {
                actualPollerSize = 62;
                pollset = allocatePoller(actualPollerSize, pool, -1);
            }

            pollerCount = defaultPollerSize / actualPollerSize;
            pollerTime = pollTime / pollerCount;
            nextPollerTime = pollerTime;

            pollers = new long[pollerCount];
            pollers[0] = pollset;
            for (int i = 1; i < pollerCount; i++) {
                pollers[i] = allocatePoller(actualPollerSize, pool, -1);
            }
            pollerSpace = new int[pollerCount];
            for (int i = 0; i < pollerCount; i++) {
                pollerSpace[i] = actualPollerSize;
            }
            desc = new long[actualPollerSize * 2];
            connectionCount.set(0);
            addList = new SocketList(defaultPollerSize);
            closeList = new SocketList(defaultPollerSize);
        }
         /**
         * The background thread that listens for incoming TCP/IP connections
         * and hands them off to an appropriate processor.
         */
	 //监听TCP/IP连接,并交给相应handler处理
        @Override
        public void run() {
		// Poll for the specified interval
                    for (int i = 0; i < pollers.length; i++) {
		         for (int n = 0; n < rv; n++) {
                                long timeout = timeouts.remove(desc[n*2+1]);
                                AprSocketWrapper wrapper = connections.get(
                                        Long.valueOf(desc[n*2+1]));
                                if (getLog().isDebugEnabled()) {
                                    log.debug(sm.getString(
                                            "endpoint.debug.pollerProcess",
                                            Long.valueOf(desc[n*2+1]),
                                            Long.valueOf(desc[n*2])));
                                }
                                wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
                                // Check for failed sockets and hand this socket off to a worker
				//处理请求
                                if (wrapper.isComet()) {
                                    // Event processes either a read or a write depending on what the poller returns
                                    //APR_POLLHUP
				    if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
                                            || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
                                            || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
                                        if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
					//APR_POLLIN
                                    } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
                                        if (wrapper.pollerFlags != 0) {
                                            add(desc[n*2+1], 1, wrapper.pollerFlags);
                                        }
					//OPEN_READ
                                        if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
					//APR_POLLOUT
                                    } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
                                        if (wrapper.pollerFlags != 0) {
                                            add(desc[n*2+1], 1, wrapper.pollerFlags);
                                        }
                                        if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
                                    } else {
                                        // Unknown event
                                        getLog().warn(sm.getString(
                                                "endpoint.apr.pollUnknownEvent",
                                                Long.valueOf(desc[n*2])));
                                        if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
                                    }
                                }
		    }
		    synchronized (this) {
                this.notifyAll();
            }
        }
    /**
     * Process given socket. Called in non-comet mode, typically keep alive
     * or upgraded protocol.
     */
    //处理socket事件
    public boolean processSocket(long socket, SocketStatus status) {
        try {
            Executor executor = getExecutor();
            if (executor == null) {
                log.warn(sm.getString("endpoint.warn.noExector",
                        Long.valueOf(socket), null));
            } else {
                SocketWrapper<Long> wrapper =
                        connections.get(Long.valueOf(socket));
                // Make sure connection hasn't been closed
                if (wrapper != null) {
		    //创建SocketProcessor处理线程,并执行
                    executor.execute(new SocketProcessor(wrapper, status));
                }
            }
        } 
        return true;
    }
    }

来看SocketProcessor
// -------------------------------------------- SocketProcessor Inner Class
    /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor implements Runnable {

        private final SocketWrapper<Long> socket;
        private final SocketStatus status;
        public SocketProcessor(SocketWrapper<Long> socket,
                SocketStatus status) {
            this.socket = socket;
            if (status == null) {
                // Should never happen
                throw new NullPointerException();
            }
            this.status = status;
        }
        @Override
        public void run() {

            // Upgraded connections need to allow multiple threads to access the
            // connection at the same time to enable blocking IO to be used when
            // Servlet 3.1 NIO has been configured
            if (socket.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
                synchronized (socket.getWriteThreadLock()) {
                    doRun();
                }
            } else {
                synchronized (socket) {
                    doRun();
                }
            }
        }
        private void doRun() {
            // Process the request from this socket
            if (socket.getSocket() == null) {
                // Closed in another thread
                return;
            }
	    //handler处理请求
            SocketState state = handler.process(socket, status);
            if (state == Handler.SocketState.CLOSED) {
                // Close socket and pool
                closeSocket(socket.getSocket().longValue());
                socket.socket = null;
            } else if (state == Handler.SocketState.LONG) {
                socket.access();
                if (socket.async) {
                    waitingRequests.add(socket);
                }
            } else if (state == Handler.SocketState.ASYNC_END) {
                socket.access();
                SocketProcessor proc = new SocketProcessor(socket,
                        SocketStatus.OPEN_READ);
                getExecutor().execute(proc);
            }
        }
    }

根据handler.process(socket, status),我们去查看handler是如何处理请求的在Http11Protocol的
构造方法中可以看到handler就是Http11ConnectionHandler
public Http11Protocol() {
        //新建请求处理EndPoint为JIoEndpoint
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
	//设置endpoint的handler
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

查看Http11ConnectionHandler是Http11Protocol的内部类
// -----------------------------------  Http11ConnectionHandler Inner Class
    protected static class Http11ConnectionHandler
            extends AbstractConnectionHandler<Socket, Http11Processor> implements Handler {
	 }

而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
// ------------------------------------------- Connection handler base class
    
    protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>>
            implements AbstractEndpoint.Handler {
	@SuppressWarnings("deprecation") 
	// Old HTTP upgrade method has been deprecated
	//处理HTTP请求
        public SocketState process(SocketWrapper<S> wrapper,SocketStatus status) {
            if (wrapper == null) {
                // Nothing to do. Socket has been closed.
                return SocketState.CLOSED;
            }
            S socket = wrapper.getSocket();
            if (socket == null) {
                // Nothing to do. Socket has been closed.
                return SocketState.CLOSED;
            }
            Processor<S> processor = connections.get(socket);
            if (status == SocketStatus.DISCONNECT && processor == null) {
                // Nothing to do. Endpoint requested a close and there is no
                // longer a processor associated with this socket.
                return SocketState.CLOSED;
            }
            wrapper.setAsync(false);
            ContainerThreadMarker.markAsContainerThread();
            try {
                if (processor == null) {
                    processor = recycledProcessors.poll();
                }
                if (processor == null) {
		    //创建处理器
                    processor = createProcessor();
                }
                //初始化SSL,在子类中扩展
                initSsl(wrapper, processor);
                SocketState state = SocketState.CLOSED;
                do {
                    if (status == SocketStatus.CLOSE_NOW) {
                        processor.errorDispatch();
                        state = SocketState.CLOSED;
                    } else if (status == SocketStatus.DISCONNECT &&
                            !processor.isComet()) {
                        // Do nothing here, just wait for it to get recycled
                        // Don't do this for Comet we need to generate an end
                        // event (see BZ 54022)
                    } else if (processor.isAsync() || state == SocketState.ASYNC_END) {
                        state = processor.asyncDispatch(status);
                        if (state == SocketState.OPEN) {
                            // release() won't get called so in case this request
                            // takes a long time to process, remove the socket from
                            // the waiting requests now else the async timeout will
                            // fire
                            getProtocol().endpoint.removeWaitingRequest(wrapper);
                            // There may be pipe-lined data to read. If the data
                            // isn't processed now, execution will exit this
                            // loop and call release() which will recycle the
                            // processor (and input buffer) deleting any
                            // pipe-lined data. To avoid this, process it now.
                            state = processor.process(wrapper);
                        }
                    } else if (processor.isComet()) {
                        state = processor.event(status);
                    } {
                        state = processor.process(wrapper);
                    }
        }
	//带子类扩展
	 protected abstract P createProcessor();
        protected abstract void initSsl(SocketWrapper<S> socket,
                Processor<S> processor);
        protected abstract void longPoll(SocketWrapper<S> socket,
                Processor<S> processor);
        protected abstract void release(SocketWrapper<S> socket,
                Processor<S> processor, boolean socketClosing,
                boolean addToPoller);
}
查看Http11ConnectionHandler,createProcessor
	 @Override
        protected Http11AprProcessor createProcessor() {
            Http11AprProcessor processor = new Http11AprProcessor(
                    proto.getMaxHttpHeaderSize(), (AprEndpoint)proto.endpoint,
                    proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(),
                    proto.getMaxExtensionSize(), proto.getMaxSwallowSize());
	   //设置处理器适配器,在connector构造函数中,
	   //adapter = new CoyoteAdapter(this);
           //protocolHandler.setAdapter(adapter);
	   //有这么两句
            processor.setAdapter(proto.adapter);
            processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
            processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
            processor.setConnectionUploadTimeout(
                    proto.getConnectionUploadTimeout());
            processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
            processor.setCompressionMinSize(proto.getCompressionMinSize());
            processor.setCompression(proto.getCompression());
            processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
            processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
            processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
            processor.setSocketBuffer(proto.getSocketBuffer());
            processor.setMaxSavePostSize(proto.getMaxSavePostSize());
            processor.setServer(proto.getServer());
            processor.setClientCertProvider(proto.getClientCertProvider());
            register(processor);
            return processor;
        }

查看Http11AprProcessor
public class Http11AprProcessor extends AbstractHttp11Processor<Long> {}

无处理函数process(socket, status)
查看AbstractHttp11Processor
public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> {
/**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     * @param socketWrapper Socket from which the HTTP requests will be read
     *               and the HTTP responses will be written.
     * @throws IOException error during an I/O operation
     */
     //处理HTTP请求
    @Override
    public SocketState process(SocketWrapper<S> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);
        getInputBuffer().init(socketWrapper, endpoint);
        getOutputBuffer().init(socketWrapper, endpoint);
	 while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
                upgradeInbound == null &&
                httpUpgradeHandler == null && !endpoint.isPaused()) {

            // Parsing the request header
            try {
                setRequestLineReadTimeout();
	       //503
               if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    response.setStatus(503);
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                } else {
                    keptAlive = true;
                    // Set this every time in case limit has been changed via JMX
                    request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                    // Currently only NIO will ever return false here
                    if (!getInputBuffer().parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!disableUploadTimeout) {
                        setSocketTimeout(connectionUploadTimeout);
                    }
                }
		catch (Throwable t) {
                // 400 - Bad Request
                response.setStatus(400);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
	    if (!getErrorState().isError()) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
		    //请求预处理
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }
           //503,400,500,看到这些数字想到了什么?
	   // Process the request in the adapter
	   //适配器,处理request, response
            if (!getErrorState().isError()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    //这个就是关键了,service函数,这个adapter,上面已经讲过
		    //实际上是CoyoteAdapter
                    adapter.service(request, response);
	    }
	 /**
         * After reading the request headers, we have to setup the request filters.
         */
         protected void prepareRequest() {
		MimeHeaders headers = request.getMimeHeaders();
		   // Check for a full URI (including protocol://host:port/)
                 ByteChunk uriBC = request.requestURI().getByteChunk();
		 if (uriBC.startsWithIgnoreCase("http", 0)) {
		    int pos = uriBC.indexOf("://", 0, 3, 4);
		    int uriBCStart = uriBC.getStart();
		    int slashPos = -1;
		    if (pos != -1) {
			byte[] uriB = uriBC.getBytes();
			slashPos = uriBC.indexOf('/', pos + 3);
			if (slashPos == -1) {
			    slashPos = uriBC.getLength();
			    // Set URI as "/"
			    request.requestURI().setBytes
				(uriB, uriBCStart + pos + 1, 1);
			} else {
			    request.requestURI().setBytes
				(uriB, uriBCStart + slashPos,
				 uriBC.getLength() - slashPos);
			}
			MessageBytes hostMB = headers.setValue("host");
			hostMB.setBytes(uriB, uriBCStart + pos + 3,
					slashPos - pos - 3);
		    }

        }
    }
}

从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
public class CoyoteAdapter implements Adapter {
     /**
     * Construct a new CoyoteProcessor associated with the specified connector.
     *
     * @param connector CoyoteConnector that owns this processor
     */
    //一个Adapter关联一个Connector
    public CoyoteAdapter(Connector connector) {
        super();
        this.connector = connector;
    }
    private Connector connector = null;
     /**
     * Encoder for the Location URL in HTTP redirects.
     */
    protected static URLEncoder urlEncoder;
   //url安全字符集
    static {
        urlEncoder = new URLEncoder();
        urlEncoder.addSafeCharacter('-');
        urlEncoder.addSafeCharacter('_');
        urlEncoder.addSafeCharacter('.');
        urlEncoder.addSafeCharacter('*');
        urlEncoder.addSafeCharacter('/');
    }
    **
     * Service method.
     */
    @Override
    public void service(org.apache.coyote.Request req,
                        org.apache.coyote.Response res)
        throws Exception {
	Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);
	try {
            // Parse and set Catalina and configuration specific
            // request parameters
            req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
		//这个就是我们一直想要找的,Servlet处理request
                connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
	}
}

前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);


从上面可以看出Poller,调用本地方创建socket监听线程,当监听到连接时,创一个SocketProcessor,即socket处理线程SOCKET处理线程,委托给Http11ConnectionHandler(Http11ConnectionHandler是Http11Protocol的内部类)处理,
Http11ConnectionHandler继承AbstractConnectionHandler,AbstractConnectionHandler处理请求是获取precessor,precessor从Http11ConnectionHandler获取,实际为Http11AprProcessor,Http11AprProcessor继承AbstractHttp11Processor,并处理请求在process(SocketWrapper<S> socketWrapper)中,在这个方法中处理请求头部,再讲实际业务逻辑处理委托CoyoteAdapter的service方法,service方法通过反射调用对应的servlet的service方法来处理逻辑。

在这里,简单总结一下,每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),
而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。

上面一部分的堆栈有点深,现在退出来,回到AprEndpoint的startInternal看     
startAcceptorThreads();这句话
这个方法在AbstractEndPoint中
protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];
        for (int i = 0; i < count; i++) {
	    //创建
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }
    /**
     * Hook to allow Endpoints to provide a specific Acceptor implementation.
     */
    //委托给子类扩展
 protected abstract Acceptor createAcceptor();

//AbstractEndPoint的抽象静态内部类Acceptor,描述Socket请求处理状态
public abstract static class Acceptor implements Runnable {
        public enum AcceptorState {
            NEW, RUNNING, PAUSED, ENDED
        }

        protected volatile AcceptorState state = AcceptorState.NEW;
        public final AcceptorState getState() {
            return state;
        }

        private String threadName;
        protected final void setThreadName(final String threadName) {
            this.threadName = threadName;
        }
        protected final String getThreadName() {
            return threadName;
        }
    }

查看AprEndpoint的createAcceptor方法
  @Override
    protected AbstractEndpoint.Acceptor createAcceptor() {
        return new Acceptor();
    }

总结Connector
每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理Request的Servlet包装类;同时每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。Connector初始化最主要的是,初始化Http11Protocol,实际初始化Endpoint,EndPoint初始化bind,SOCKET地址,并监听请求及初始化SSL,Endpoint启动,即初始化Poller(处理socket请求)。

下面一篇文章我们来看看,JioEndPoint。
//ContainerThreadMarker
public class ContainerThreadMarker {
    private static final ThreadLocal<Boolean> marker = new ThreadLocal<Boolean>();
    public static boolean isContainerThread() {
        Boolean flag = marker.get();
        if (flag == null) {
            return false;
        } else {
            return flag.booleanValue();
        }
    }
    public static void markAsContainerThread() {
        marker.set(Boolean.TRUE);
    }
}

//Pool
public class Pool {

    /**
     * Create a new pool.
     * @param parent The parent pool.  If this is 0, the new pool is a root
     * pool.  If it is non-zero, the new pool will inherit all
     * of its parent pool's attributes, except the apr_pool_t will
     * be a sub-pool.
     * @return The pool we have just created.
    */
    public static native long create(long parent);

    /**
     * Clear all memory in the pool and run all the cleanups. This also destroys all
     * subpools.
     * @param pool The pool to clear
     * This does not actually free the memory, it just allows the pool
     *         to re-use this memory for the next allocation.
     */
    public static native void clear(long pool);

    /**
     * Destroy the pool. This takes similar action as apr_pool_clear() and then
     * frees all the memory.
     * This will actually free the memory
     * @param pool The pool to destroy
     */
    public static native void destroy(long pool);

    /**
     * Get the parent pool of the specified pool.
     * @param pool The pool for retrieving the parent pool.
     * @return The parent of the given pool.
     */
    public static native long parentGet(long pool);

    /**
     * Determine if pool a is an ancestor of pool b
     * @param a The pool to search
     * @param b The pool to search for
     * @return True if a is an ancestor of b, NULL is considered an ancestor
     * of all pools.
     */
    public static native boolean isAncestor(long a, long b);


    /*
     * Cleanup
     *
     * Cleanups are performed in the reverse order they were registered.  That is:
     * Last In, First Out.  A cleanup function can safely allocate memory from
     * the pool that is being cleaned up. It can also safely register additional
     * cleanups which will be run LIFO, directly after the current cleanup
     * terminates.  Cleanups have to take caution in calling functions that
     * create subpools. Subpools, created during cleanup will NOT automatically
     * be cleaned up.  In other words, cleanups are to clean up after themselves.
     */

    /**
     * Register a function to be called when a pool is cleared or destroyed
     * @param pool The pool register the cleanup with
     * @param o The object to call when the pool is cleared
     *                      or destroyed
     * @return The cleanup handler.
     */
    public static native long cleanupRegister(long pool, Object o);

    /**
     * Remove a previously registered cleanup function
     * @param pool The pool remove the cleanup from
     * @param data The cleanup handler to remove from cleanup
     */
    public static native void cleanupKill(long pool, long data);

    /**
     * Register a process to be killed when a pool dies.
     * @param a The pool to use to define the processes lifetime
     * @param proc The process to register
     * @param how How to kill the process, one of:
     * <PRE>
     * APR_KILL_NEVER         -- process is never sent any signals
     * APR_KILL_ALWAYS        -- process is sent SIGKILL on apr_pool_t cleanup
     * APR_KILL_AFTER_TIMEOUT -- SIGTERM, wait 3 seconds, SIGKILL
     * APR_JUST_WAIT          -- wait forever for the process to complete
     * APR_KILL_ONLY_ONCE     -- send SIGTERM and then wait
     * </PRE>
     */
    public static native void noteSubprocess(long a, long proc, int how);

    /**
     * Allocate a block of memory from a pool
     * @param p The pool to allocate from
     * @param size The amount of memory to allocate
     * @return The ByteBuffer with allocated memory
     */
    public static native ByteBuffer alloc(long p, int size);

    /**
     * Allocate a block of memory from a pool and set all of the memory to 0
     * @param p The pool to allocate from
     * @param size The amount of memory to allocate
     * @return The ByteBuffer with allocated memory
     */
    public static native ByteBuffer calloc(long p, int size);

    /*
     * User data management
     */

    /**
     * Set the data associated with the current pool
     * @param data The user data associated with the pool.
     * @param key The key to use for association
     * @param pool The current pool
     * <br><b>Warning :</b>
     * The data to be attached to the pool should have a life span
     * at least as long as the pool it is being attached to.
     * Object attached to the pool will be globally referenced
     * until the pool is cleared or dataSet is called with the null data.
     * @return APR Status code.
     */
     public static native int dataSet(long pool, String key, Object data);

    /**
     * Return the data associated with the current pool.
     * @param key The key for the data to retrieve
     * @param pool The current pool.
     */
     public static native Object dataGet(long pool, String key);

    /**
     * Run all of the child_cleanups, so that any unnecessary files are
     * closed because we are about to exec a new program
     */
    public static native void cleanupForExec();

}


//SocketWrapper
public class SocketWrapper<E> {

    protected volatile E socket;

    // Volatile because I/O and setting the timeout values occurs on a different
    // thread to the thread checking the timeout.
    protected volatile long lastAccess = System.currentTimeMillis();
    protected volatile long timeout = -1;
    
    protected boolean error = false;
    protected long lastRegistered = 0;
    protected volatile int keepAliveLeft = 100;
    private boolean comet = false;
    protected boolean async = false;
    protected boolean keptAlive = false;
    private boolean upgraded = false;
    private boolean secure = false;

    /*
     * Used if block/non-blocking is set at the socket level. The client is
     * responsible for the thread-safe use of this field via the locks provided.
     */
    private volatile boolean blockingStatus = true;
    private final Lock blockingStatusReadLock;
    private final WriteLock blockingStatusWriteLock;

    /*
     * In normal servlet processing only one thread is allowed to access the
     * socket at a time. That is controlled by a lock on the socket for both
     * read and writes). When HTTP upgrade is used, one read thread and one
     * write thread are allowed to access the socket concurrently. In this case
     * the lock on the socket is used for reads and the lock below is used for
     * writes.
     */
    private final Object writeThreadLock = new Object();

    public SocketWrapper(E socket) {
        this.socket = socket;
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        this.blockingStatusReadLock = lock.readLock();
        this.blockingStatusWriteLock =lock.writeLock();
    }

    public E getSocket() {
        return socket;
    }

    public boolean isComet() { return comet; }
    public void setComet(boolean comet) { this.comet = comet; }
    public boolean isAsync() { return async; }
    public void setAsync(boolean async) { this.async = async; }
    public boolean isUpgraded() { return upgraded; }
    public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; }
    public boolean isSecure() { return secure; }
    public void setSecure(boolean secure) { this.secure = secure; }
    public long getLastAccess() { return lastAccess; }
    public void access() {
        // Async timeouts are based on the time between the call to startAsync()
        // and complete() / dispatch() so don't update the last access time
        // (that drives the timeout) on every read and write when using async
        // processing.
        if (!isAsync()) {
            access(System.currentTimeMillis());
        }
    }
    public void access(long access) { lastAccess = access; }
    public void setTimeout(long timeout) {this.timeout = timeout;}
    public long getTimeout() {return this.timeout;}
    public boolean getError() { return error; }
    public void setError(boolean error) { this.error = error; }
    public void setKeepAliveLeft(int keepAliveLeft) { this.keepAliveLeft = keepAliveLeft;}
    public int decrementKeepAlive() { return (--keepAliveLeft);}
    public boolean isKeptAlive() {return keptAlive;}
    public void setKeptAlive(boolean keptAlive) {this.keptAlive = keptAlive;}
    public boolean getBlockingStatus() { return blockingStatus; }
    public void setBlockingStatus(boolean blockingStatus) {
        this.blockingStatus = blockingStatus;
    }
    public Lock getBlockingStatusReadLock() { return blockingStatusReadLock; }
    public WriteLock getBlockingStatusWriteLock() {
        return blockingStatusWriteLock;
    }
    public Object getWriteThreadLock() { return writeThreadLock; }

    public void reset(E socket, long timeout) {
        async = false;
        blockingStatus = true;
        comet = false;
        error = false;
        keepAliveLeft = 100;
        lastAccess = System.currentTimeMillis();
        this.socket = socket;
        this.timeout = timeout;
        upgraded = false;
    }

    /**
     * Overridden for debug purposes. No guarantees are made about the format of
     * this message which may vary significantly between point releases.
     * <p>
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        return super.toString() + ":" + String.valueOf(socket);
    }
}
  • Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程
            
    
    博客分类: Tomcat tomcatcometsocketthreadservlet 
  • 大小: 26.3 KB