欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Tomcat系列(二)- EndPoint源码解析

程序员文章站 2022-08-02 22:18:21
在上一节中我们描述了Tomcat的整体架构, 我们知道了Tomcat分为两个大组件,一个连接器和一个容器。 而我们这次要讲的 EndPoint的组件就是属于连接器里面的。 它是一个通信的端点,就是负责对外实现TCP/IP协议。 EndPoint是个接口, 它的具体实现类就是 AbstractEndp ......

在上一节中我们描述了tomcat的整体架构,

我们知道了tomcat分为两个大组件,一个连接器和一个容器。

而我们这次要讲的 endpoint的组件就是属于连接器里面的。

它是一个通信的端点,就是负责对外实现tcp/ip协议。

endpoint是个接口,

它的具体实现类就是 abstractendpoint,而 abstractendpoint具体的实现类就有 aprendpointnio2endpointnioendpoint

  • aprendpoint:对应的是apr模式,简单理解就是从操作系统级别解决异步io的问题,大幅度提高服务器的处理和响应性能。但是启用这种模式需要安装一些其他的依赖库。

  • nio2endpoint:利用代码来实现异步io

  • nioendpoint:利用了java的nio实现了非阻塞io,tomcat默认启动是以这个来启动的,而这个也是我们的讲述重点。

nioendpoint中重要的组件

  我们知道 nioendpoint的原理还是对于linux的多路复用器的使用,而在多路复用器中简单来说就两个步骤。

    1. 创建一个selector,在它身上注册各种channel,然后调用select方法,等待通道中有感兴趣的事件发生。

    2. 如果有感兴趣的事情发生了,例如是读事件,那么就将信息从通道中读取出来。

  而 nioendpoint为了实现上面这两步,用了五个组件来。

  这五个组件是 limitlatchacceptorpollersocketprocessorexecutor

/**
 * threads used to accept new connections and pass them to worker threads.
 */
protected list<acceptor<u>> acceptors;

/**
 * counter for nr of connections handled by an endpoint
 */
private volatile limitlatch connectionlimitlatch = null;

/**
 * the socket pollers.
 */
private poller[] pollers = null;

// 内部类
socketprocessor

/**
 * external executor based thread pool.
 */
private executor executor = null;

  我们可以看到在代码中定义的这五个组件。具体这五个组件是干嘛的呢?

    • limitlatch:连接控制器,负责控制最大的连接数

    • acceptor:负责接收新的连接,然后返回一个 channel对象给 poller

    • poller:可以将其看成是nio中 selector,负责监控 channel的状态

    • socketprocessor:可以看成是一个被封装的任务类

    • executor:tomcat自己扩展的线程池,用来执行任务类

  用图简单表示就是以下的关系

  Tomcat系列(二)- EndPoint源码解析

  接下来我们就来分别的看一下每个组件里面关键的代码

limitlatch

  我们上面说了 limitlatch主要是用来控制tomcat所能接收的最大数量连接,如果超过了此连接,那么tomcat就会将此连接线程阻塞等待,等里面有其他连接释放了再消费此连接。

  那么 limitlatch是如何做到呢?我们可以看 limitlatch这个类

public class limitlatch {
    private static final log log = logfactory.getlog(limitlatch.class);
    
    private class sync extends abstractqueuedsynchronizer {
        private static final long serialversionuid = 1l;
        public sync() {}
        @override
        protected int tryacquireshared(int ignored) {
            long newcount = count.incrementandget();
            if (!released && newcount > limit) {
                // limit exceeded
                count.decrementandget();
                return -1;
            } else {
                return 1;
            }
        }
        @override
        protected boolean tryreleaseshared(int arg) {
            count.decrementandget();
            return true;
        }
    }

    private final sync sync;
    private final atomiclong count;
    private volatile long limit;
    private volatile boolean released = false;
}

  我们可以看到它内部实现了 abstractqueuedsynchronizer,aqs其实就是一个框架,实现它的类可以自定义控制线程什么时候挂起什么时候释放。

  limit参数就是控制的最大连接数。

  我们可以看到 abstractendpoint调用 limitlatchcountuporawait方法来判断是否能获取连接。

public void countuporawait() throws interruptedexception {
        if (log.isdebugenabled()) {
            log.debug("counting up["+thread.currentthread().getname()+"] latch="+getcount());
        }
        sync.acquiresharedinterruptibly(1);
    }

  aqs是如何知道什么时候阻塞线程呢?即不能获取连接呢?

  这些就靠用户自己实现 abstractqueuedsynchronizer自己来定义什么时候获取连接,什么时候释放连接了。

  可以看到sync类重写了 tryacquiresharedtryreleaseshared方法。

  在 tryacquireshared方法中定义了一旦当前连接数大于了设置的最大连接数,那么就会返回 -1表示将此线程放入aqs队列中等待。

acceptor

  acceptor是接收连接的,我们可以看到 acceptor实现了 runnable接口,那么在哪会新开启线程来执行 acceptor的run方法呢?

  在 abstractendpointstartacceptorthreads方法中。

protected final void startacceptorthreads() {
    int count = getacceptorthreadcount();
    acceptors = new acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createacceptor();
        string threadname = getname() + "-acceptor-" + i;
        acceptors[i].setthreadname(threadname);
        thread t = new thread(acceptors[i], threadname);
        t.setpriority(getacceptorthreadpriority());
        t.setdaemon(getdaemon());
        t.start();
    }
}

  可以看到这里可以设置开启几个 acceptor,默认是一个。

  而一个端口只能对应一个 serversocketchannel,那么这个 serversocketchannel在哪初始化呢?我们可以看到在 acceptor<u>acceptor=newacceptor<>(this);

  这句话中传入了this进去,那么应该是由 endpoint组件初始化的连接。

  在 nioendpointinitserversocket方法中初始化了连接。

  Tomcat系列(二)- EndPoint源码解析

  这里面我们能够看到两点

    1. 在bind方法中的第二个参数表示操作系统的等待队列长度,即tomcat不再接受连接时(达到了设置的最大连接数),但是在操作系统层面还是能够接受连接的,此时就将此连接信息放入等待队列,那么这个队列的大小就是此参数设置

    2. serversocketchannel被设置成了阻塞的模式,也就是说是以阻塞方式接受连接的。

    或许会有疑问。在平时的nio编程中channel不是都要设置成非阻塞模式吗?

    这里解释一下,如果是设置成非阻塞模式那么就必须设置一个 selector不断的轮询,但是接受连接只需要阻塞一个通道即可。

  Tomcat系列(二)- EndPoint源码解析

  这里需要注意一点,每个 acceptor在生成 pollerevent对象放入 poller队列中时都是随机取出 poller对象的,

  所以 poller中的 queue对象设置成了 synchronizedqueue<pollerevent>,因为可能有多个 acceptor同时向此 poller的队列中放入 pollerevent对象。

  具体代码可以看如下,

public poller getpoller0() {
    int idx = math.abs(pollerrotater.incrementandget()) % pollers.length;
    return pollers[idx];
}

什么是操作系统级别的连接呢?

在tcp的三次握手中,系统通常会每一个listen状态的socket维护两个队列,一个是半连接队列(syn):

这些连接已经收到客户端syn;另一个是全连接队列(accept):

这些链接已经收到客户端的ack,完成了三次握手,等待被应用调用accept方法取走使用。

  所有的 acceptor共用这一个连接,在 acceptorrun方法中,放一些重要的代码。

public void run(){
    // loop until we receive a shutdown command
    while(endpoint.isrunning()){
        try{
            //如果到了最大连接数,线程等待
            endpoint.countuporawaitconnection();
            u socket = null;             
            try{
                //调用accept方法获得一个连接
                socket = endpoint.serversocketaccept();
            }catch(exception ioe){                        
                // 出异常以后当前连接数减掉1
                endpoint.countdownconnection();                   
            }
            // 配置socket
            if(endpoint.isrunning() && !endpoint.ispaused()){
                // setsocketoptions() will hand the socket off to                   
                // an appropriate processor if successful                      
                if(!endpoint.setsocketoptions(socket)){
                    endpoint.closesocket(socket)        
                }
            } else {
                endpoint.destroysocket(socket);                    
            }        
        }
    }
}    

  里面我们可以得到两点

    1. 运行时会先判断是否到达了最大连接数,如果到达了那么就阻塞线程等待,里面调用的就是 limitlatch组件判断的。

    2. 最重要的就是配置socket这一步了,是 endpoint.setsocketoptions(socket)这段代码

  Tomcat系列(二)- EndPoint源码解析

  其实里面重要的就是将 acceptor与一个 poller绑定起来,然后两个组件通过队列通信,每个poller都维护着一个 synchronizedqueue队列, channelevent放入到队列中,然后 poller从队列中取出事件进行消费。

poller

  我们可以看到 pollernioendpoint的内部类,而它也是实现了 runnable接口,可以看到在其类中维护了一个quene和selector,定义如下。

  所以本质上 poller就是 selector

private selector selector;
private final synchronizedqueue<pollerevent> events =new synchronizedqueue<>();

  重点在其run方法中,这里删减了一些代码,只展示重要的。

  Tomcat系列(二)- EndPoint源码解析

  其中主要的就是调用了 events()方法,就是不断的查看队列中是否有 pollerevent事件,如果有的话就将其取出然后把里面的 channel取出来注册到该 selector中,然后不断轮询所有注册过的 channel查看是否有事件发生。

socketprocessor

  我们知道 poller在轮询 channel有事件发生时,就会调用将此事件封装起来,然后交给线程池去执行。

  那么这个包装类就是 socketprocessor

  而我们打开此类,能够看到它也实现了 runnable接口,用来定义线程池 executor中线程所执行的任务。

  那么这里是如何将 channel中的字节流转换为tomcat需要的 servletrequest对象呢?其实就是调用了 http11processor来进行字节流与对象的转换的。

executor

  executor其实是tomcat定制版的线程池。我们可以看它的类的定义,可以发现它其实是扩展了java的线程池。

public interface executor extends java.util.concurrent.executor, lifecycle

  在线程池中最重要的两个参数就是核心线程数和最大线程数,正常的java线程池的执行流程是这样的。

    1. 如果当前线程小于核心线程数,那么来一个任务就创建一个线程。

    2. 如果当前线程大于核心线程数,那么就再来任务就将任务放入到任务队列中。所有线程抢任务。

    3. 如果队列满了,那么就开始创建临时线程。

    4. 如果总线程数到了最大的线程数并且队列也满了,那么就抛出异常。

  但是在tomcat自定义的线程池中是不一样的,通过重写了 execute方法实现了自己的任务处理逻辑。

    1. 如果当前线程小于核心线程数,那么来一个任务就创建一个线程。

    2. 如果当前线程大于核心线程数,那么就再来任务就将任务放入到任务队列中。所有线程抢任务。

    3. 如果队列满了,那么就开始创建临时线程。

    4. 如果总线程数到了最大的线程数,再次获得任务队列,再尝试一次将任务加入队列中。

    5. 如果此时还是满的,就抛异常。

  差别就在于第四步的差别,原生线程池的处理策略是只要当前线程数大于最大线程数,那么就抛异常,而tomcat的则是如果当前线程数大于最大线程数,就再尝试一次,如果还是满的才会抛异常。

  下面是定制化线程池 execute的执行逻辑。

public void execute(runnable command, long timeout,timeunit unit){
    submittedcount.incrementandget();  
    try{ 
        super.execute(command);
    }catch(rejectedexecutionexception rx){            
        if(super.getqueue() instanceof taskqueue){

            //获得任务队列             
            final taskqueue queue = (taskqueue)super.getqueue();
            try{            
                if(!queue.force(command,timeout,unit)){
                    submittedcount.decrementandget();                
                    throw new rejectedexecutionexception(sm.getstring("threadpoolexecutor.queuefull"));                   
                }         
            }catch(interruptedexception x){
                submittedcount.decrementandget();
                throw new rejectedexecutionexception(x);            
            }    
        }else{
            submittedcount.decrementandget();
            throw rx;        
        }   
    }
}

  在代码中,我们可以看到有这么一句 submittedcount.incrementandget();

  为什么会有这句呢?我们可以看看这个参数的定义。

  简单来说这个参数就是定义了任务已经提交到了线程池中,但是还没有执行的任务个数。

private final atomicinteger submittedcount = new atomicinteger(0);

  为什么会有这么一个参数呢?

  我们知道定制的队列是继承了 linkedblockingqueue,而 linkedblockingqueue队列默认是没有边界的。

  于是我们就传入了一个参数, maxqueuesize给构造的队列。

  但是在tomcat的任务队列默认情况下是无限制的,那么这样就会出一个问题,如果当前线程达到了核心线程数,则开始向队列中添加任务,那么就会一直是添加成功的。

  那么就不会再创建新的线程。那么在什么情况下要新建线程呢?

线程池中创建新线程会有两个地方,一个是小于核心线程时,来一个任务创建一个线程。另一个是超过核心线程并且任务队列已满,则会创建临时线程。

  那么如何规定任务队列是否已满呢?如果设置了队列的最大长度当然好了,但是tomcat默认情况下是没有设置,所以默认是无限的。所以tomcat的 taskqueue继承了 linkedblockingqueue,重写了 offer方法,在里面定义了什么时候返回false。

  Tomcat系列(二)- EndPoint源码解析

  这就是 submittedcount的意义,目的就是为了在任务队列长度无限的情况下,让线程池有机会创建新的线程。

总结

  上面的知识有部分是看着李号双老师的深入拆解tomcat总结的,又结合着源码深入了解了一下,当时刚看文章的时候觉得自己都懂了,但是再深入源码的时候又会发现自己不懂。

  所以知识如果只是看了而不运用,那么知识永远都不会是自己的。

  通过tomcat连接器这一小块的源码学习,除了一些常用知识的实际运用,例如aqs、锁的应用、自定义线程池需要考虑的点、nio的应用等等。

  还有总体上的设计思维的学习,模块化设计,和如今的微服务感觉很相似,将一个功能点内部分为多种模块,这样无论是在以后替换或者是升级时都能游刃有余。