欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

druid 源码分析与学习(含详细监控设计思路的彩蛋)

程序员文章站 2022-06-24 17:58:08
...
    Druid是阿里巴巴公司的数据库连接池工具,昨天突然想学习一下阿里的druid源码,于是下载下来分析了一下。也就2个多小时粗略看了一下,中间有些知识点没见过,不懂,现查BAIDU学习。简单总结一下,边总结边继续看代码,估计错误不少,欢迎指正!

    在自己看之前,想找找druid源码分析,居然在BAIDU上搜索不到任何信息,只是介绍如何配置,只能自己看过来了。这里的介绍,细节不说了,着眼于大方向与设计思路。

1。为监控而生,那么何时,又如何监控呢?
    简单的操作数据库通常涉及datasource,connection,preparedstatement ,ResultSet等东西,如果我要监控这些,必然要建些代理类。
   我们的操作都由代理类完成,在完成的过程中,产生监控的数据。
   druid号称为监控而生,监控功能就是一根针,正所谓见缝插针,没有缝隙就要创造缝隙,所以就要建个代理类,代理类与被代理理之间就是缝隙。而代理对象必然持有被代理对象。
    public interface PreparedStatementProxy extends PreparedStatement, StatementProxy
    作为实现类:PreparedStatementProxyImpl,就持有一个java.sql.PreparedStatement。
    随便看看它的查询方法:
    public ResultSet executeQuery() throws SQLException {
.....
        return createChain().preparedStatement_executeQuery(this);//产生过滤链,并由过滤链执行。
    }
  FilterChainImpl中有:
    public ResultSetProxy preparedStatement_executeQuery(PreparedStatementProxy statement) throws SQLException {
        if (this.pos < filterSize) {
            return nextFilter().preparedStatement_executeQuery(this, statement);
        }
        ResultSet resultSet = statement.getRawObject().executeQuery();
        return wrap(statement, resultSet);
    }

    上面的方法说明:在执行查询前,要经过过滤链处理,等处理完了,再由statement执行,执行完了,得到一个ResultSet后,包装一个产生最后返回的代理类。

2.说说统计过滤器吧,只是过滤链上的一环
   此模式见的最多的就是web.xml中配置的过滤器filter了,你配置几个过滤器filter,都实现了dofilter()方法,那么把filter们组织成来,放在filterchain的List之类的容器中,那么就可以循环执行dofilter来处理些东东。可以看出filterchain持有所有的filter,那filter执行后,要告诉filterchain执行下一个,所以fiter还必须持有filterchain。但filter不需要一直持有filterchain对象,只是临时持有一会,所以filterchain是作为方法的参数传进来的。这也说明filter可以是这个chain中的一环,也可以同时是另一个chain中的一环,谁来都行。
这个过程,又点象观察者模式,又象回调模式。
   看个简单的,StatFilter中的一个统计连接提交的方法:(StatFilter中的过滤方法超级多,对各种数据库操作都记账;当然logfilter中也一样,防御SQL注入攻击的WallFilter,估计也一样:)
    @Override
    public void connection_commit(FilterChain chain, ConnectionProxy connection) throws SQLException {
        chain.connection_commit(connection);
        JdbcDataSourceStat dataSourceStat = chain.getDataSource().getDataSourceStat();
        dataSourceStat.getConnectionStat().incrementConnectionCommitCount();
    }

  先是让chain去作一步(就是nextFilter开始干活,所有的filter都干完了,就真正commit一下)然后,对数据源的commit的操作计数进行增加。
    public void connection_commit(ConnectionProxy connection) throws SQLException {
        if (this.pos < filterSize) {
            nextFilter().connection_commit(this, connection);//让下一个干活
            return;
        }
        connection.getRawObject().commit();//都干完了,才真正提交。这个连接也是一个代理,让里面真正的java.sql.connection提交。
    }
    private Filter nextFilter() {
        Filter filter = getFilters().get(pos++);
        return filter;
    }

DataSourceProxyConfig中有一个private final List<Filter> filters = new ArrayList<Filter>();//就是普通的arraylist放过滤器。

上面两点合在一起,就是原来执行一个数据库操作,现在给代理类执行,执行中先经过一个个过滤器进行统计,之后再真正执行数据库操作。对最终用户透明的。

3.统计的东东怎么记录的呢?
在stat包里面,随便找一个对象看看吧。比如:JdbcStatementStat
..........................
    private final AtomicLong    createCount      = new AtomicLong(0);                                     // 执行createStatement的计数
    private final AtomicLong    prepareCount     = new AtomicLong(0);                                     // 执行parepareStatement的计数
    private final AtomicLong    prepareCallCount = new AtomicLong(0);                                     // 执行preCall的计数
    private final AtomicLong    closeCount       = new AtomicLong(0); 

.....................................
哇哦,一大堆统计计数器,都是AtomicLong的,就是线程同步的,增加计数时调用它的incrementAndGet()方法。不过TableStat中就是普通的int了,呵呵。

4.说说最上面的蓝字的执行
ResultSet resultSet = statement.getRawObject().executeQuery();
     PreparedStatementProxy代理了一个实现了java.sql.statement接口的对象,所以它用那个对象执行查询。那个对象是什么样的呢?
     对了,是这个:DruidPooledPreparedStatement,看的出,这个家伙也是代理了别人,因为它用stat来做查询,只是代理前后做了点其它事情。
    public ResultSet executeQuery() throws SQLException {
        checkOpen();
        incrementExecuteCount();
        transactionRecord(sql);
        oracleSetRowPrefetch();
        conn.beforeExecute();
        try {
            ResultSet rs = stmt.executeQuery();
            if (rs == null) {
                return null;
            }
            DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
            addResultSetTrace(poolableResultSet);
            return poolableResultSet;
        } catch (Throwable t) {
            throw checkException(t);
        } finally {
            conn.afterExecute();
        }
    }

    其中:addResultSetTrace是把查询结果放在List<ResultSet>       resultSetTrace;中。为何?

5.看看几个Holder是干什么的?
DruidConnectionHolder中有什么?new DruidPooledConnection时,正好用这个holder。
    private final DruidAbstractDataSource       dataSource;
    private final Connection                    conn;
    private final List<ConnectionEventListener> connectionEventListeners = new CopyOnWriteArrayList<ConnectionEventListener>();
    private final List<StatementEventListener>  statementEventListeners  = new CopyOnWriteArrayList<StatementEventListener>();
    private PreparedStatementPool               statementPool;   //这是一个LRU算法的池。放的就是下面的PreparedStatementHolder!!!!
    private final List<Statement>               statementTrace           = new ArrayList<Statement>(2);

PreparedStatementHolder中呢?new DruidPooledPreparedStatement时,正好用这个holder。
    private final PreparedStatementKey key;
    private final PreparedStatement    statement;


Holder从名字来看就是持有什么,DruidConnectionHolder必然持有Connection,PreparedStatementHolder必然持有PreparedStatement。DruidConnectionHolder当然还持有属于这个连接的PreparedStatement之类的。透过几个调用关系,差不多可以猜测出设计思路:
一般我们用connect对象,再产生statment对象,再执行SQL之类的,当我们在一个对象执行前后做一些统计之类的操作时,那就用代理对象来做,比如前面的filterchain用于代理对象中。可是如果调用其它对象时,想把它们之间的一些关联的东东保持下来,比如一个连接下的所有的PreparedStatement,那就需要一个holder对象来帮忙了。也许你可以把一堆其它的东东都给这个对象身上,不过这样就不清晰了,太乱了。
没准holder也可以称为一个设计模式。当然proxy也是,还记得有叫handler的吧,都有行为学上的意义!!
DruidPooledConnection中的PreparedStatement prepareStatement(String sql)方法,就是看看池子里有没有(stmtHolder = holder.getStatementPool().get(key);),没有的话才new PreparedStatementHolder(key, conn.prepareStatement(sql));,有的话内存容器中取了。

大概关系这样的:DruidPooledConnection-->DruidConnectionHolder-->ConnectionProxy-->filterChain---connection。

6.讲讲上面用到的LRU缓存,就是存PreparedStatementHolder的池子。
  一种LinkedHashMap吧,自己实现一下removeEldestEntry方法就可以了,容量达到就扔掉最OLD的。
    public class LRUCache extends LinkedHashMap<PreparedStatementKey, PreparedStatementHolder> {
        private static final long serialVersionUID = 1L;
        public LRUCache(int maxSize){
            super(maxSize, 0.75f, true);
        }
        protected boolean removeEldestEntry(Entry<PreparedStatementKey, PreparedStatementHolder> eldest) {
            boolean remove = (size() > dataSource.getMaxPoolPreparedStatementPerConnectionSize());
            if (remove) {
                closeRemovedStatement(eldest.getValue());
            }
            return remove;
        }
    }


7.MOCK包
  这个也不太懂,网上查了一下,说是一些造假的,且方便测试的对象,它实现了相关的接口,所以可以被当成它所假冒的东西使用。特别是真实的东西不方便使用,或者很慢,或者有其它不理想的情况下。

8,sqlPaser
  druid下的主要的功能包除这个以外,都介绍完了,说是SQL解析器,不过没空看了,下次看了再补充吧。

9.connectPool连接池
  连接池当然是重头戏了,简单先提一下,主要用到的是ReentrantLock锁,还有 notEmpty empty两个条件,生产连接与消费连接的线程在两个条件上等待与唤醒。连接池是由数据源确定的,所以具体要看pool包里的DruidAbstractDataSource与DruidDataSource两个类了。
哇,这两个类很庞大,首先看了一下属性,主要有很多count,time,还有一些default值,留意集合字段,比如Map<DruidPooledConnection, Object> activeConnections
private volatile DruidConnectionHolder[] connections;之类的。里面还有些线程。
9.1创建连接
  先看看DruidDataSource里的CreateConnectionThread都干什么,首先是一些条件,比如下面这个代码(省略不重要的代码),连接太多了的时候,在empty条件上等待,就是等空了再运行,现在别急着创建连接,等着吧!
// 防止创建超过maxActive数量的连接
                        if (activeCount + poolingCount >= maxActive) {
                            empty.await();
                            continue;
                        }
                try {
                    connection = createPhysicalConnection();
                    setFailContinuous(false);
                boolean result = put(connection);

后面是创建一个物理连接,然后put一下,这个put很可能是放池子中,那么仔细看一下。主要下面几句,说明都写在后面了:
    holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);//产生一个连接holder
            connections[poolingCount] = holder;//这不就是池子吗?就是一个DruidConnectionHolder的Array了。
            incrementPoolingCount();//池子中的计数加1.
            notEmpty.signal();//发出非空的信号,所有在非空条件上等待的线程,你们可能动起来了。
            notEmptySignalCount++;
9.2使用连接
    那么谁在notEmpty条件上等待呢?我们查一下,发现是方法  DruidConnectionHolder takeLast()之中,当poolingCount中数量为0时等待。正好说明使用连接的线程,当连接没有时,就等待着呗。如果池中有连接呢?就执行下面的语句了:
        decrementPoolingCount();//减少池中连接的计数,当然拿走一个少一个了。
        DruidConnectionHolder last = connections[poolingCount];//正好拿走的是池中最后一个。
        connections[poolingCount] = null;//最后一个就成null了。
    顺便看看谁在用takeLast(),查找发现是getPooledConnection(),从名字就知道是获取链接的主要方法了。仔细看看getPooledConnection中又调用getConnection(),这里面又是插入了过滤链,果然是为统计而生,都记录在案了。仔细看filterChain.dataSource_connect()参数中有this,说明它把自己传进去了,说明这个filterChain并不从属于任何datasource,可以是这具数据源,也可以是那个数据源。具体过滤哪个,临时传入。
    当我们设计过滤链时,如果我们的功能是为多人服务的,那就说明要传入服务对象进来。而不是setpropety设置成关联关系。一个人如何设计复杂的代码呢?当然是头脑中有一个非常抽象,而明确的思路。
9.2减少连接
在创建连接线程附近还有一个DestroyConnectionThread(),看看吧
跟踪里面,有destroyTask.run();----->shrink(true);看名字是收缩嘛,可能连接空闲的太多了,就缩小呗。
在shrink()方法中,重点有下面的语句,分析放在后面:
final int checkCount = poolingCount - minIdle;//池中的数量-最小空闲数量,感觉是收缩的条件之一嘛。
        for (DruidConnectionHolder item : evictList) {//可回收的都放在这里了
            Connection connection = item.getConnection();
            JdbcUtils.close(connection);//关闭这些连接了。
            destroyCount.incrementAndGet();
        }


9.3 初始化方法init()
都是谁在用这两个线程呢?查找一下,发现创建线程与收缩线程是由void init()来调用的,看名字就知道这是系统启动的主方法了。
void init(){
     initFromSPIServiceLoader();//load filters from SPI ServiceLoader,这个spi就不介绍了,在分析dubbo的另一个帖子里,里面已经有SPI介绍了。反正是把配置的filter放在filterchain中(List<Filter>)
connections = new DruidConnectionHolder[maxActive];//新建连接池,个数是最大活动连接数maxActive。
                for (int i = 0, size = getInitialSize(); i < size; ++i) {//放入连接池中连接
                    PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                    DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                    connections[poolingCount] = holder;
                    incrementPoolingCount();
                }
            createAndLogThread();//看名字是日志,就不看了
            createAndStartCreatorThread();//创建连接的线程,一直在工作,池子满了就是等待状态。
            createAndStartDestroyThread();//收缩池子的线程,一直在工作。

            initedLatch.await();//主线程在计数器为0前一直等待。
    init = true;
}

就里有一个知识点。CountDownLatch             initedLatch             = new CountDownLatch(2); 就叫倒计时同步器。当前同步数为2,在变成0后,主线程才能运行,否则一直等待中。
在创建连接与收缩池子的线程中都有initedLatch.countDown();,一共正好两个,那么主线程就是等待上面两个线程都运行了才运行吧,才置init状态标识为true。看来理解没有错喽。

单看一个create线程,里面有一个countDown方法:
    protected void createAndStartCreatorThread() {
        if (createScheduler == null) {
            String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
            createConnectionThread = new CreateConnectionThread(threadName);
            createConnectionThread.start();
            return;
        }
        initedLatch.countDown();//如果有createScheduler就直接-1;
    }
    public class CreateConnectionThread extends Thread {
        public void run() {
            initedLatch.countDown();//如果没有时,在这里面-1;


10。总结:

看过了源码?我们究竟如何提高?我们也可以做出这么好的东东吗?
1.首先你会掌握足够多的基础知识,比如会用到多线程,concurrent包里的东西,甚至很少用的容器。说明作者看过很厚的资料书或者看过很多java源码。
2.深入学习国外的源码。我发现一些相似的处理方式,在这个产品中出现,在那个产品中也出现,当然不一定完全相同。比如hadoop中的异步转同步与dubbo就差不多,什么 fastfail在秒杀中也有人用。
3.看过两个源码了,应该可以抽象出处理问题的方式了,这样自己碰到相似情况时,马上就可以套用。
【彩蛋】druid 源码分析与学习(含详细监控设计思路的彩蛋)
            
    
    博客分类: druidjavadatabase源码 druidjavadatabase  druid 源码分析与学习(含详细监控设计思路的彩蛋)
            
    
    博客分类: druidjavadatabase源码 druidjavadatabase 
基本上看完了主要代码,为监控而生这句体会更深了,我们就择其一点,深入体会一下过滤链模式吧,算是本贴的彩蛋。这个东东真的很多地方在用,那么我们如何用呢?使用时,应该有三个对象,分别是被处理的对象,过滤器,过滤器链,之前的帖子我提出一个观点,对象的本质是各种关系的组合,组合是最重要的。那么这两个对象之间的包含关系,引用关系,以及主要的方法应该怎么设计,以及为什么要这么设计呢?
过滤器是一个基本单元,特点是:它不会引用过滤器链,因为它可以属于不同的过滤器链。它不会引用处理对象,因为它可以处理这个对象也可以处理那个对象。所以在这三者中,过滤器的过滤方法中,会传入另外两个对象,而不会在其它属性和方法中产生。web中的过滤器中的dofilter参数就是chain与req与res三个。
过滤链一般都是可装配的,过滤器是一个个基本单元,所以链条要给用户配置的机会,过滤器以后可能会增加,但这个不是给用户的功能,可以通过spi的机制加进来。
过滤器链条是一个组合器,它的特点:产生过滤器时,必须把基本过滤器传进来,而且是稳定的引用关系,但因为链条上有多个过滤器,所以要有一个容器来放它们。所以过滤链持有一个容器,init的时候,放入一个个过滤器。过滤器是串起来一个个执行的,所以还要有一个定位信息,现在执行到那一个了。再深入思考一下,过滤链处理的对象,有的是处理到第2个,有的处理到第5个,有的处理到第3个。那多线程的问题来了。查一下代码(protected int   pos = 0;if (this.pos < filterSize) donext),看来pos定位计数不是一个threadlocal啊,再看public FilterChainImpl(DataSourceProxy dataSource),构造函数中传入的是数据源,所以一个数据源不会传入计数指针,只会传过来过滤器容器。回头我想查一下web中filter的源码,里面的容器是啥,定位是啥?是不是threadlocal变量呢?
( 补一下,后来在tomcat这里查到了:org.apache.catalina.core.ApplicationFilterChain的属性中 private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];  private int pos = 0; 还包含了beforeFilter和afterFilter等功能,作者肯定学习到这些过滤代码了,而我too young了,看的东西太少了)
有点复杂了,再看看FilterChainImpl,里面并不持有过滤单元,而其中的重要方法,getFilters、nextFilter,都是从构造chain的dataSource中来的。有点象什么呢?象以前你去找饭店吃饭,现在你带着菜让饭店加工。有道理啊,因为过滤链规则并不一定要持有基本单位啊,就象冒泡排序不一定持有排序元素,是可以单独抽象出来的。过滤链既然含有非线程的pos,那么每个过滤链条都是一次性使用,否则pos指针就乱了。那么查找一下工程中所有的new FilterChainImpl(),我们发现pool包里有4个,其它都在proxy包里,正好证明了前面说的见缝插针,弄出个代理来,在代理与被代理之间插入过滤链,或者其它什么你想要的功能。比如ClobProxyImpl中有createChain()方法,而且每个clob操作中都要调用createChain,就是说每个操作都是新的过滤链,那么前面提到的pos计数就是仅自己方法使用,不会有什么共享冲突的发生了。

我们再理一下思路,三个主要的元素(被处理的对象,过滤器,过滤器链),其中过滤器链被拆分了,只剩下过滤规则与计数器。过滤容器交给了被处理对象持有了,变来料加工。过滤链条是每一个proxy对象、准确的说是每一个proxy对象的每一个具体要使用过滤的方法临时create出来的。再理一个思路,我们看一个filter,看接口与对象都成,发现里面有处理connect的一大堆方法,有处理resultset的一大堆方法,有处理statemente的一大堆方法,有处理....。

哇哦,太多了,所有的都在这里,每一个过滤器都很庞大,很全面,是重量级对象,只有一份。一个数据源持有几个过滤器,过滤链不是细长形状,而是矮胖形状的。而你的任何一个对象(当然是代理对象喽)的任何一个简单操作,都会生成一个过滤链。所以你的过滤链中不应该持有过滤器,持有每一个简单计数pos是非常合理的,过滤链是一个轻量级的对象。

再引申思考一下,如果我们一开始想到一个功能,比如这么一个监控功能,我们可以肯定的知道用filter方式,我们能否如这般组织好这些对象之间的关系呢?而且这么组织有没有什么进一步重构的可能呢?
我们可能会有一批与连接相关的过滤器,一批与resultset有关的过滤器;从另一个角度,我们有统计的过滤器,有日志的过滤器。我们还有过滤链规则,我们有很多要过滤的对象,可以先打散,比如过滤链的规则与过滤链容器是分开的。再尝试组合,可以按连接来组合(连接的统计,连接的日志在一起),可以从功能上组合(连接日志与resultset日志在一起)。那么我们插拨的要么是整个统计,要么是整个日志。如果另一种组合,我们可以只要连接的统计与日志,可以只要查询的日志与统计。但如果要连接的统计,要查询的日志,那么druid目前是不支持的。这有点象拆成块,再搭积木的感觉。
再比如说,现在由datasource来持有过滤器容器。那么过滤器的粒度只是不同的数据源。有些过滤是对connect的,有些是对resultset的,所有的数据源的下级对象都是一样的过滤器容器,这方面有没有个性化的需求呢?比如对clob我只想日志不想统计,createChain的时候是不是有什么参数可以配置一下?

另外,在看FilterChainImpl时,它除了有一大批要过滤的方法外,还有好几个wrap方法。wrap恰好是根据原始对象来生成过滤缝隙的代理对象,同时把产生filterchain的条件都传给它,让代理对象执行每一个方法都可以new filterchain出来。而在代理对象的每一个具体方法中,调用filterchain来处理时,又把代理自己给filterchain,filterchain执行过程中,还要用代理对象的getRawObject得到被代理对象来执行最后的业务。如果代理对象连续执行了三个不同的方法,那就是第一次new 一个filterchain,一个链条处理完了就会recycleFilterChain(chain);重置pos计数,后面两个方法就还是用这个代理对象持有的filterchain,不过计数变成0了。

再理一下:我有一个原始对象,通过FilterChainImpl产生代理对象,代理对象再传给FilterChainImpl的具体方法用,方法里面进行过滤后再取出原始对象来用。再比喻:我有一个不锈钢杯子,让别人给包上泥巴,成了一个泥杯子,再让别人做烧制,着色,刻画等处理。最后成为精美的陶瓷水杯,当然喝水功能还是靠原来的不锈钢杯子来实现的,因为它不是不不锈钢菜刀。当然菜刀也可以包上泥巴,也做烧制,着色,刻画等处理,最后成为一个陶瓷刀具。包泥巴和烧制,着色,刻画当然是不同的业务,但可以开一个店来做。

再全面总结一下:
关于druid,配置一个数据源,它持有一堆过滤器零件,持有一个连接池,连接池里有两个主要的线程在跑。我对任何对象做任何操作时,给我包装一下,再产生或重置一个过虑链并过滤处理后再执行操作,就酱紫。

druid 源码分析与学习(含详细监控设计思路的彩蛋)
            
    
    博客分类: druidjavadatabase源码 druidjavadatabase 
  • druid 源码分析与学习(含详细监控设计思路的彩蛋)
            
    
    博客分类: druidjavadatabase源码 druidjavadatabase 
  • 大小: 379.7 KB