连接池-Mybatis源码
持续更新:https://github.com/dchack/mybatis-source-code-learn
mybatis连接池
有这么个定律,有连接的地方就有池。
在市面上,可以适配mybatis datesource的连接池有很对,比如:
mybatis也自带来连接池的功能,先学习下mybatis的,相对简单的实现。
涉及的类:
poolstate
public class poolstate { protected pooleddatasource datasource; // 空闲连接集合 protected final list<pooledconnection> idleconnections = new arraylist<pooledconnection>(); // 正在使用的连接集合 protected final list<pooledconnection> activeconnections = new arraylist<pooledconnection>(); // 请求次数,每次获取连接,都会自增,用于 protected long requestcount = 0; // 累计请求耗时,每次获取连接时计算累加,除以requestcount可以获得平均耗时 protected long accumulatedrequesttime = 0; // 累计连接使用时间 protected long accumulatedcheckouttime = 0; // 过期连接次数 protected long claimedoverdueconnectioncount = 0; protected long accumulatedcheckouttimeofoverdueconnections = 0; // 累计等待获取连接时间 protected long accumulatedwaittime = 0; // 等待获取连接的次数 protected long hadtowaitcount = 0; // 连接已关闭的次数 protected long badconnectioncount = 0; public poolstate(pooleddatasource datasource) { this.datasource = datasource; } public synchronized long getrequestcount() { return requestcount; } public synchronized long getaveragerequesttime() { return requestcount == 0 ? 0 : accumulatedrequesttime / requestcount; } public synchronized long getaveragewaittime() { return hadtowaitcount == 0 ? 0 : accumulatedwaittime / hadtowaitcount; } public synchronized long gethadtowaitcount() { return hadtowaitcount; } public synchronized long getbadconnectioncount() { return badconnectioncount; } public synchronized long getclaimedoverdueconnectioncount() { return claimedoverdueconnectioncount; } public synchronized long getaverageoverduecheckouttime() { return claimedoverdueconnectioncount == 0 ? 0 : accumulatedcheckouttimeofoverdueconnections / claimedoverdueconnectioncount; } public synchronized long getaveragecheckouttime() { return requestcount == 0 ? 0 : accumulatedcheckouttime / requestcount; } public synchronized int getidleconnectioncount() { return idleconnections.size(); } public synchronized int getactiveconnectioncount() { return activeconnections.size(); } }
注意代码中的字段都是用protected修饰的,表示pooled包内都可访问,在写这份代码的时候必然默认这个包下实现一个独立的功能,内部字段都可以共享使用,否则都写set,get方法太麻烦了。
在poolstate
类中,很多指标比如requestcount
,claimedoverdueconnectioncount
等都不和连接池核心逻辑相关,纯粹只是表示连接池的一些指标而已。
作为连接池,在这里最重要的就是两个list:
- idleconnections
- activeconnections
这两个都是arraylist,所以在整个实现中我们是通过synchronized
关键字来处理并发场景的。
pooledconnection
组成池的两个list中存储的是pooledconnection
,而pooledconnection
通过java动态代理机制实现代理真正connection。pooledconnection
继承invocationhandler
,所以实现了invoke
方法:
/* * required for invocationhandler implementation. * * @param proxy - not used * @param method - the method to be executed * @param args - the parameters to be passed to the method * @see java.lang.reflect.invocationhandler#invoke(object, java.lang.reflect.method, object[]) */ @override public object invoke(object proxy, method method, object[] args) throws throwable { string methodname = method.getname(); if (close.hashcode() == methodname.hashcode() && close.equals(methodname)) { datasource.pushconnection(this); return null; } else { try { if (!object.class.equals(method.getdeclaringclass())) { // issue #579 tostring() should never fail // throw an sqlexception instead of a runtime checkconnection(); } return method.invoke(realconnection, args); } catch (throwable t) { throw exceptionutil.unwrapthrowable(t); } } } private void checkconnection() throws sqlexception { if (!valid) { throw new sqlexception("error accessing pooledconnection. connection is invalid."); } }
主要看到这个代理实现处理了close
方法,就是将连接从使用列表中弹出。
对于其他方法,会判断方法是否属于object中的方法,如果不是则进行连接合法的校验,然后执行真正connection
即realconnection
中对应的方法。
获得一个代理类的代码,即调用proxy.newproxyinstance
方法,在pooledconnection
中的构造函数中:
/* * constructor for simplepooledconnection that uses the connection and pooleddatasource passed in * * @param connection - the connection that is to be presented as a pooled connection * @param datasource - the datasource that the connection is from */ public pooledconnection(connection connection, pooleddatasource datasource) { this.hashcode = connection.hashcode(); this.realconnection = connection; this.datasource = datasource; this.createdtimestamp = system.currenttimemillis(); this.lastusedtimestamp = system.currenttimemillis(); this.valid = true; this.proxyconnection = (connection) proxy.newproxyinstance(connection.class.getclassloader(), ifaces, this); }
我们可以看到realconnection
是在构造函数时就传入的了。
而配置这个池的参数都是在pooleddatasource
中:
官方文档:
poolmaximumactiveconnections – 在任意时间可以存在的活动(也就是正在使用)连接数量,默认值:10
poolmaximumidleconnections – 任意时间可能存在的空闲连接数。
poolmaximumcheckouttime – 在被强制返回之前,池中连接被检出(checked out)时间,默认值:20000 毫秒(即 20 秒)
pooltimetowait – 这是一个底层设置,如果获取连接花费了相当长的时间,连接池会打印状态日志并重新尝试获取一个连接(避免在误配置的情况下一直安静的失败),默认值:20000 毫秒(即 20 秒)。
poolmaximumlocalbadconnectiontolerance – 这是一个关于坏连接容忍度的底层设置, 作用于每一个尝试从缓存池获取连接的线程。 如果这个线程获取到的是一个坏的连接,那么这个数据源允许这个线程尝试重新获取一个新的连接,但是这个重新尝试的次数不应该超过 poolmaximumidleconnections 与 poolmaximumlocalbadconnectiontolerance 之和。 默认值:3 (新增于 3.4.5)
poolpingquery – 发送到数据库的侦测查询,用来检验连接是否正常工作并准备接受请求。默认是“no ping query set”,这会导致多数数据库驱动失败时带有一个恰当的错误消息。
poolpingenabled – 是否启用侦测查询。若开启,需要设置 poolpingquery 属性为一个可执行的 sql 语句(最好是一个速度非常快的 sql 语句),默认值:false。
poolpingconnectionsnotusedfor – 配置 poolpingquery 的频率。可以被设置为和数据库连接超时时间一样,来避免不必要的侦测,默认值:0(即所有连接每一时刻都被侦测 — 当然仅当 poolpingenabled 为 true 时适用)。
pooleddatasource
pooleddatasource
完成池功能的类,直接看拿连接的popconnection
方法:
private pooledconnection popconnection(string username, string password) throws sqlexception { boolean countedwait = false; pooledconnection conn = null; // 触发获取连接的当前时间 long t = system.currenttimemillis(); int localbadconnectioncount = 0; while (conn == null) { // 同步 synchronized (state) { // 判断空闲列表中是否可以提供连接 if (!state.idleconnections.isempty()) { // pool has available connection conn = state.idleconnections.remove(0); if (log.isdebugenabled()) { log.debug("checked out connection " + conn.getrealhashcode() + " from pool."); } } else { // pool does not have available connection // 判断是否达到最大连接数限制 if (state.activeconnections.size() < poolmaximumactiveconnections) { // can create new connection conn = new pooledconnection(datasource.getconnection(), this); if (log.isdebugenabled()) { log.debug("created connection " + conn.getrealhashcode() + "."); } } else { // cannot create new connection pooledconnection oldestactiveconnection = state.activeconnections.get(0); long longestcheckouttime = oldestactiveconnection.getcheckouttime(); // 判断最老一个连接使用时间是否超过最大值 if (longestcheckouttime > poolmaximumcheckouttime) { // can claim overdue connection state.claimedoverdueconnectioncount++; state.accumulatedcheckouttimeofoverdueconnections += longestcheckouttime; state.accumulatedcheckouttime += longestcheckouttime; state.activeconnections.remove(oldestactiveconnection); if (!oldestactiveconnection.getrealconnection().getautocommit()) { try { oldestactiveconnection.getrealconnection().rollback(); } catch (sqlexception e) { /* just log a message for debug and continue to execute the following statement like nothing happend. wrap the bad connection with a new pooledconnection, this will help to not intterupt current executing thread and give current thread a chance to join the next competion for another valid/good database connection. at the end of this loop, bad {@link @conn} will be set as null. */ log.debug("bad connection. could not roll back"); } } // 这里看到将包装在oldestactiveconnection中的realconnection重新用pooledconnection包装出来直接使用,看前面操作是将连接进行回滚,但是可能失败,却不关心,注释解释是,在后面的代码中会进行isvalid的判断,其中就会判断连接是否可用。 conn = new pooledconnection(oldestactiveconnection.getrealconnection(), this); conn.setcreatedtimestamp(oldestactiveconnection.getcreatedtimestamp()); conn.setlastusedtimestamp(oldestactiveconnection.getlastusedtimestamp()); // 将老连接设置成invalid oldestactiveconnection.invalidate(); if (log.isdebugenabled()) { log.debug("claimed overdue connection " + conn.getrealhashcode() + "."); } } else { // must wait try { if (!countedwait) { state.hadtowaitcount++; countedwait = true; } if (log.isdebugenabled()) { log.debug("waiting as long as " + pooltimetowait + " milliseconds for connection."); } long wt = system.currenttimemillis(); // 线程等待,也释放了锁 state.wait(pooltimetowait); state.accumulatedwaittime += system.currenttimemillis() - wt; } catch (interruptedexception e) { break; } } } } if (conn != null) { // ping to server and check the connection is valid or not if (conn.isvalid()) { if (!conn.getrealconnection().getautocommit()) { conn.getrealconnection().rollback(); } conn.setconnectiontypecode(assembleconnectiontypecode(datasource.geturl(), username, password)); conn.setcheckouttimestamp(system.currenttimemillis()); conn.setlastusedtimestamp(system.currenttimemillis()); state.activeconnections.add(conn); state.requestcount++; state.accumulatedrequesttime += system.currenttimemillis() - t; } else { if (log.isdebugenabled()) { log.debug("a bad connection (" + conn.getrealhashcode() + ") was returned from the pool, getting another connection."); } state.badconnectioncount++; localbadconnectioncount++; // 不可用的连接会被设置成null,被回收器回收 conn = null; if (localbadconnectioncount > (poolmaximumidleconnections + poolmaximumlocalbadconnectiontolerance)) { if (log.isdebugenabled()) { log.debug("pooleddatasource: could not get a good connection to the database."); } throw new sqlexception("pooleddatasource: could not get a good connection to the database."); } } } } } if (conn == null) { if (log.isdebugenabled()) { log.debug("pooleddatasource: unknown severe error condition. the connection pool returned a null connection."); } throw new sqlexception("pooleddatasource: unknown severe error condition. the connection pool returned a null connection."); } return conn; }
popconnection
方法实现在一个池中获取连接的基本逻辑,依赖最大连接数,获取等待时间,连接使用超时时间等来完成一个池的核心能力。
注意这里使用wait
方法来等待,在java线程池中使用阻塞队列来出来暂时拿不到资源的请求。
前面提到,在使用connection
时,调用close
方法,会调用到datasource.pushconnection(this);
,就是将这个连接使用完了还回池的动作:
protected void pushconnection(pooledconnection conn) throws sqlexception { // 一样加锁 synchronized (state) { // 从使用线程列表中删除 state.activeconnections.remove(conn); if (conn.isvalid()) { // 判断空闲连接列表是否超过最大值 if (state.idleconnections.size() < poolmaximumidleconnections && conn.getconnectiontypecode() == expectedconnectiontypecode) { state.accumulatedcheckouttime += conn.getcheckouttime(); if (!conn.getrealconnection().getautocommit()) { conn.getrealconnection().rollback(); } pooledconnection newconn = new pooledconnection(conn.getrealconnection(), this); // 加入到空闲连接列表中 state.idleconnections.add(newconn); newconn.setcreatedtimestamp(conn.getcreatedtimestamp()); newconn.setlastusedtimestamp(conn.getlastusedtimestamp()); conn.invalidate(); if (log.isdebugenabled()) { log.debug("returned connection " + newconn.getrealhashcode() + " to pool."); } // 通知等待线程 state.notifyall(); } else { state.accumulatedcheckouttime += conn.getcheckouttime(); if (!conn.getrealconnection().getautocommit()) { conn.getrealconnection().rollback(); } conn.getrealconnection().close(); if (log.isdebugenabled()) { log.debug("closed connection " + conn.getrealhashcode() + "."); } conn.invalidate(); } } else { if (log.isdebugenabled()) { log.debug("a bad connection (" + conn.getrealhashcode() + ") attempted to return to the pool, discarding connection."); } state.badconnectioncount++; } } }
归还连接时,需要查看空闲列表中的线程数量是否已经到到设置的最大值,如果已经达到,就不需要归还了,凡是需要加入空闲列表的都需要进行notifyall
操作,来通知那些等待的线程来抢这个归还的连接,但是如果此时连接池中空闲连接充足,并没有线程等待,这个操作也就浪费了,所以可以思考前面popconnection
中的wait
和这里的notifyall
是可以用等待队列来完成。
另外一个方法,用于判断连接是否可用:
protected boolean pingconnection(pooledconnection conn) { boolean result = true; try { // 先用isclosed来获取结果 result = !conn.getrealconnection().isclosed(); } catch (sqlexception e) { if (log.isdebugenabled()) { log.debug("connection " + conn.getrealhashcode() + " is bad: " + e.getmessage()); } result = false; } if (result) { // 可以通过poolpingenabled配置来决定是否使用自定义sql if (poolpingenabled) { if (poolpingconnectionsnotusedfor >= 0 && conn.gettimeelapsedsincelastuse() > poolpingconnectionsnotusedfor) { try { if (log.isdebugenabled()) { log.debug("testing connection " + conn.getrealhashcode() + " ..."); } connection realconn = conn.getrealconnection(); statement statement = realconn.createstatement(); // 执行poolpingquery resultset rs = statement.executequery(poolpingquery); rs.close(); statement.close(); if (!realconn.getautocommit()) { realconn.rollback(); } result = true; if (log.isdebugenabled()) { log.debug("connection " + conn.getrealhashcode() + " is good!"); } } catch (exception e) { log.warn("execution of ping query '" + poolpingquery + "' failed: " + e.getmessage()); try { conn.getrealconnection().close(); } catch (exception e2) { //ignore } result = false; if (log.isdebugenabled()) { log.debug("connection " + conn.getrealhashcode() + " is bad: " + e.getmessage()); } } } } } return result; }
从代码中可以看到isclosed
方法并不可靠,最终还是通过执行sql来判断连接是否可用,这个在很多涉及判断数据库连接是否有效的地方都是这么做的,详细可以看一下isclosed
方法的注释。
pooleddatasourcefactory
继承unpooleddatasourcefactory,直接返回pooleddatasource对象
public class pooleddatasourcefactory extends unpooleddatasourcefactory { public pooleddatasourcefactory() { this.datasource = new pooleddatasource(); } }
心得
在整个线程池的实现代码中,可以学习到一个池的实现的要素有哪些,以及录用基础代码如何实现一个池。对于那些封装成高层次的池的代码来说,这个实现显得又些单薄和不够全面,可是无论连接池如何实现核心池的实现逻辑是不会变的。
上一篇: 论酒后失态的行为
下一篇: win8不显示图片缩略图的解决方法
推荐阅读
-
[Go] gocron源码阅读-判断是否使用root用户执行
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)
-
持久层框架JPA与Mybatis该如何选型
-
长期作业:web框架源码剖析
-
一个可分页的基于文本的PHP留言板源码第1/2页
-
jQuery选择器源码解读(四):tokenize方法的Expr.preFilter
-
jQuery选择器源码解读(二):select方法
-
jQuery选择器源码解读(一):Sizzle方法
-
jQuery选择器源码解读(三):tokenize方法
-
datax(七)源码阅读之运行时监控MXBean