欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

commons-pool-1.6部分源码分析

程序员文章站 2022-05-22 18:44:47
...
类图关系

为了简单起见,该图只表现继承和实现关系

commons-pool-1.6部分源码分析
            
    
    博客分类: java java对象池dbcp 

大部分情况下我们只使用ObjectPool和PoolableObjectFactory的相关实现类,我们重点分析GenricObjectPool类


配置参数

类GenericObjectPool.Config定义了相关参数:
// 池中最大空闲实例个数
int maxIdle=8

// 池中最小空闲实例个数 (evictor会用到)
int minIdle=0

// 可以从池中拿走的最多实例的个数,就是通常我们说的池大小
// 小于0则代表没有限制
int maxActive=8

// 当池耗尽并且whenExhaustedAction的值是WHEN_EXHAUSTED_BLOCK时候,
// borrowObject方法最大等待时间(毫秒),超过改时间则抛出异常
// (意思就是当池被耗尽时,从池中获取对象要等的时间)
// 小于0时一直阻塞
long maxWait = -1

// 当池被耗尽时,borrowObject方法应该采取的措施
byte whenExhaustedAction = WHEN_EXHAUSTED_BLOCK

// 默认是false,当是true得时候,borrowObject方法在返回之前
// 会调用自定义的PoolableObjectFactory.validateObject方法,校验
// 要返回的对象是否可用.如果不可用,该对象会被销毁,然后
// 会尝试生成另一个对象.
boolean testOnBorrow = false

// 默认是false,当时true的时候,当调用retruenObject将对象放回
// 池中时会调用PoolableObjectFactory.validateObject方法,校验对象   
// 是否可用. (如果不可用会销毁该对象) 
boolean testOnReturn = false
      
// 默认是fasle,当是true时,闲置对象驱逐器会调用validateObject
// 方法,校验对象是否可用,如果不可用会将其从池中drop掉
boolean testWhileIdle = false
        
// 闲置对象驱逐器运行的间隔时间(毫秒),如果是非正数则驱逐器将不会运行.
// (驱逐器可以确保池内对象不小于minIdle)
long timeBetweenEvictionRunsMillis = -1

// 驱逐器每次运行时检查池中闲置对象的最大个数
// (比如该值设置为3,此时池中有5个闲置对象,那么每次只会检查前三个闲置对象。
// 比如检查闲置对象是否可用等。ensureMinIdle()不受这个影响,因为是做完检查
// 后才执行ensureMinIdle)
int numTestsPerEvictionRun = 3

// 一个对象可以停留在池中的最少闲置时间,如果该对象在池中
// 的闲置时间大于该值,那么该对象就可以被驱逐器dorp掉。
// 如果是非正数,表示该对象可以一直闲置下去。(单位毫秒)
long minEvictableIdleTimeMillis = 1000L * 60L * 30L

// 和minEvictableIdleTimeMillis作用一样,但有一个额外条件:
// 如果池中闲置对象的个数不大于minIdle时,即使有对象的
// 闲置时间大于该设置的值也不会被dorp掉.
// 注意:如果minEvictableIdleTimeMillis>0则该参数就不在起作用.
long softMinEvictableIdleTimeMillis = -1

// 默认LIFO状态.true的意思是borrowObject方法返回池中的最近
// 使用过的闲置对象(如果该实例可用). fasle的意思是池是一个
// FIFO队列--对象从池中顺序的返回.
boolean lifo = true


从池中获取对象
从池中获取对象有borrowObject()方法负责,那么该方法到底做了什么?
public T borrowObject() throws Exception {
  long starttime = System.currentTimeMillis();
  Latch<T> latch = new Latch<T>();   //代表一个请求
  byte whenExhaustedAction;
  long maxWait;
  synchronized (this) {
      // Get local copy of current config. Can't sync when used later as
      // it can result in a deadlock. Has the added advantage that config
      // is consistent for entire method execution
      whenExhaustedAction = _whenExhaustedAction;
      maxWait = _maxWait;
  
      // Add this request to the queue
      // 请这个请求放入到请求对列.由下面的allocate()方法进行分配
      _allocationQueue.add(latch);
  }
  // Work the allocation queue, allocating idle instances and
  // instance creation permits in request arrival order
  //操作分配队列 ,为请求顺序分配空闲实例和实例创建许可     
  allocate();
  
  for(;;) {
      synchronized (this) {
          assertOpen();
      }
  
      // If no object was allocated from the pool above
      // 没有从池中分配出一个池对象
      if(latch.getPair() == null) {
        // check if we were allowed to create one
        // 检查是否允许创建一个池对象
        if(latch.mayCreate()) {
            // allow new object to be created
            // 允许创建一个池对象
        } else {
          // the pool is exhausted
          // 池被耗尽时执行以下耗尽策略
          switch(whenExhaustedAction) {
            case WHEN_EXHAUSTED_GROW:  //该策略可以忽略不能创建新池对象的限制
              // allow new object to be created
              synchronized (this) {
                 // Make sure another thread didn't allocate us an object
                 // or permit a new object to be created
              	 // 确保这是其他线程没有给当前请求(latch)关联一个池对象
                 // 或者授权可创建新对象
                 if (latch.getPair() == null && !latch.mayCreate()) {
                     //将请求从请求队列中移除,这时该请求就变为可用请求
                     _allocationQueue.remove(latch); 
                     _numInternalProcessing++;
                 }
              }
              break;
            case WHEN_EXHAUSTED_FAIL:  //该策略直接抛异常
              synchronized (this) {
                 // Make sure allocate hasn't already assigned an object
                 // in a different thread or permitted a new object to 
                 // be created
                 if (latch.getPair() != null || latch.mayCreate()) {
                     break;
                 }
                 _allocationQueue.remove(latch);
              }
              throw new NoSuchElementException("Pool exhausted");
            case WHEN_EXHAUSTED_BLOCK: //该策略让请求等待一段时间
              try {
                 // 该同步块用latch作为锁原因:
                 // 1.和allocate()方法中的latch同步块配合;
                 // 2.减小锁的范围,如果都用this锁,那么走到这里时所有的线程都要
                 // 顺序执行,但是用latch就不用,因为各个线程持有的锁(latch)不一样
                 // 所以这里对不同的线程可以并发执行。
                 synchronized (latch) {
                     // Before we wait, make sure another thread didn't 
                     // allocate us an object
                     // or permit a new object to be created
                     // 等待之前确保其他线程没有给分配一个池对象或授权创建新池对象
                     if (latch.getPair() == null && !latch.mayCreate()) {
                         if(maxWait <= 0) {  //最大等待时间小于等于0就一直等待
                             latch.wait();
                         } else {  //计算需要等待的时间并执行等待
                             // this code may be executed again after a 
                             // notify then continue cycle
                             // so, need to calculate the amount of time 
                             //to wait
                             final long elapsed = (System.currentTimeMillis() - starttime);
                             final long waitTime = maxWait - elapsed;
                             if (waitTime > 0){
                                 latch.wait(waitTime);
                             }
                         }
                     } else {
                         break;
                     }
                 }
                 // see if we were awakened by a closing pool
                 if(isClosed() == true) {
                     throw new IllegalStateException("Pool closed");
                 }
              } catch(InterruptedException e) { //被其他线程提前中断
                 boolean doAllocate = false;
                 synchronized(this) {
                     // Need to handle the all three possibilities
                     // 需要处理三种可能性
                     if (latch.getPair() == null && !latch.mayCreate()) {
                         // Case 1: latch still in allocation queue
                         // Remove latch from the allocation queue
                         // 情况1:请求仍然在请求队列,还没有分配池对象
                         _allocationQueue.remove(latch);
                     } else if (latch.getPair() == null 
                                             && latch.mayCreate()) {
                         //Case 2:latch has been given permission to create
                         //         a new object
                         //情况2:请求已经被赋予可以创建新池对象
                         _numInternalProcessing--;
                         //腾出一个坑儿,让给别人
                         doAllocate = true;
                     } else {
                         // Case 3: An object has been allocated
                         //  情况3:请求已经分配了一个池对象
                         _numInternalProcessing--;
                         //下面的returnObject方法会将该值减一,所以这里先加一
                         _numActive++;   
                         //以上两句可以理解为将关联池对象返回给了外部
                         //而这语句表示将外部的池对象放入到池中
                         returnObject(latch.getPair().getValue());
                     }
                 }
                 if (doAllocate) {
                     allocate();
                 }
                 
                 //发生中断异常后,中断状态已经被清除.
                 //重置线程中断状态
                 Thread.currentThread().interrupt(); //?
                 throw e;
              }
              
              // 如果线程等待超时、产生虚假唤醒(没有被通知、中断或超时)
              // 或者其他线程调用latch.notify则会走到这一步
              if(maxWait > 0 
                    && ((System.currentTimeMillis() - starttime) 
                              >= maxWait)) {// 超时
                 synchronized(this) {
                    // Make sure allocate hasn't already assigned an object
                    // in a different thread or permitted a new object to 
                    // be created
                    //超时后仍没有获取到一个关联池对象或者新创建许可
                    if (latch.getPair() == null && !latch.mayCreate()) {
                        // Remove latch from the allocation queue
                        _allocationQueue.remove(latch);
                    } else {//在等待的过程中获取到一个关联池对象或者新创建许可
                        break;
                    }
                 }
                 throw new NoSuchElementException("Timeout waiting "+
                                                       "for idle object");
              } else { //虚假唤醒或其它线程调用了latch.notify
                 continue; // keep looping
              }
            default:
                throw new IllegalArgumentException("WhenExhaustedAction property " +
                        whenExhaustedAction +" not recognized.");
          }
        }
      }
  
      // 走到这里说明允许创建一个新池对象或者已经关联了一个池对象
      boolean newlyCreated = false;  //是否已经创建了池对象
      if(null == latch.getPair()) { //没有关联一个吃对象,但是允许创建一个池对象
          try {
              T obj = _factory.makeObject();
              latch.setPair(new ObjectTimestampPair<T>(obj));  //关联池对象
              newlyCreated = true;
          } finally {
              if (!newlyCreated) {//池对象创建失败
                  //object cannot be created
                  //因为之前在allocate()方法中在设置_mayCreate值时
                  //将_numInternalProcessing++
                  //所以如果创建池对象失败就将可用请求个数减一
                  synchronized (this) {
                      _numInternalProcessing--;
                      // No need to reset latch - about to throw exception
                  }
                  
                  // 腾出来一个坑儿,让给其他人
                  allocate();
              }
          }
      }
      // activate & validate the object
      // 走到这里说明已经获取到了一个池对象(新创建的或者从池中拿的)
      // 激活和校验池对象是否可用
      try {
          _factory.activateObject(latch.getPair().value);
          
          //只有_testOnBorrow值为true时才对关联的池对象进行校验
          if(_testOnBorrow &&
                  !_factory.validateObject(latch.getPair().value)) {
              throw new Exception("ValidateObject failed");
          }
          
          //池对象返回给外部前,可用请求个数减一,池外活跃对象个数加一
          synchronized(this) {
              _numInternalProcessing--;
              _numActive++;
          }
          
          //返回池对象
          return latch.getPair().value;
      } catch (Throwable e) { //??
          PoolUtils.checkRethrow(e);//??
          // object cannot be activated or is invalid
          // 池对象不能被激活或者校验失败
          try {
              _factory.destroyObject(latch.getPair().value);
          } catch (Throwable e2) {
              PoolUtils.checkRethrow(e2);
              // cannot destroy broken object
          }
          synchronized (this) {
              _numInternalProcessing--;
              // 请求已关联到一个池对象,但是池对象不可用
              if (!newlyCreated) {
              	//重置请求,并将该请求放入到请求队列
              	//后续再调用一次allocate()方法继续从池中拿空闲对象
                  latch.reset();
                  _allocationQueue.add(0, latch);
              }
          }
          allocate();
          //走到这里并且newlyCreated=true
          //那么说明池中已经没有空闲对象,并且新创建的池对象不可用,那么直接抛异常
          if(newlyCreated) {
              throw new NoSuchElementException("Could not create a validated "+
                                        "object,"+ " cause: " + e.getMessage());
          //走到这里且newlyCreated=false
          //说明请求(latch)从池中获取的对象不可用,那么继续循环从池中寻找空闲对象
          //如果池中没有空闲对象并且也不允许再创建新池对象,
          //那么会进入池耗尽策略switch(whenExhaustedAction)代码块
          }else {  
              continue; // keep looping
          }
      }
  }
}
 


我们可以看到borrowObject方法中又很多地方都调用了一个内部函数allocate(),
该方法主要负责为每一个请求(latch)分配对象,具体实现如下:

private synchronized void allocate() {
        if (isClosed()) return;

        // First use any objects in the pool to clear the queue
        // 首先,用池中的对象给请求队列中的请求分配对象
        for (;;) {
            //如果有请求队列有请求,且池中又空闲对象
            if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {
                Latch<T> latch = _allocationQueue.removeFirst();
                latch.setPair( _pool.removeFirst());  //从池中拿走一个对象给请求(latch)
                _numInternalProcessing++;   //正在分配中得对象的个数
                synchronized (latch) {
                	//当池中对象已经耗尽且when_exhausted_block值是WHEN_EXHAUSTED_BLOCK
                	//这个时候来的请求会根据过期时间在同步块中等待。
                	//这语句让让等待(latch.wait(xxx))的请求激活
                    latch.notify();	
                }
            } else {  //跳出,直到将池中得对象分配完毕
                break;
            }
        }

        // Second  any spare capacity to create new objects
        // 然后,根据池的配置决定是否可以为请求队列中的请求(latch)创建新的池对象
        for(;;) {
            //_maxActive 最大活跃对象个数
            //_numActive 当前在池外的活跃对象个数
            //_numInternalProcessing 可用请求(已分配池对象或_mayCreate标记为true的latch)的个数
            if((!_allocationQueue.isEmpty()) && (_maxActive < 0
                     || (_numActive + _numInternalProcessing) < _maxActive)) {
                Latch<T> latch = _allocationQueue.removeFirst();
                latch.setMayCreate(true);  //设置该请求(latch)的可创建标志为true
                _numInternalProcessing++;
                synchronized (latch) {
                	//通知等待的latch可以创建池对象了
                    latch.notify();	
                }
            } else {
                break;
            }
        }
    } 




未完待续...
  • commons-pool-1.6部分源码分析
            
    
    博客分类: java java对象池dbcp 
  • 大小: 86.7 KB