Mybaits 源码解析 (七)----- Select 语句的执行过程分析(下篇)全网最详细,没有之一
我们上篇文章讲到了查询方法里面的doquery方法,这里面就是调用jdbc的api了,其中的逻辑比较复杂,我们这边文章来讲,先看看我们上篇文章分析的地方
simpleexecutor
1 public <e> list<e> doquery(mappedstatement ms, object parameter, rowbounds rowbounds, resulthandler resulthandler, boundsql boundsql) throws sqlexception { 2 statement stmt = null; 3 try { 4 configuration configuration = ms.getconfiguration(); 5 // 创建 statementhandler 6 statementhandler handler = configuration.newstatementhandler(wrapper, ms, parameter, rowbounds, resulthandler, boundsql); 7 // 创建 statement 8 stmt = preparestatement(handler, ms.getstatementlog()); 9 // 执行查询操作 10 return handler.<e>query(stmt, resulthandler); 11 } finally { 12 // 关闭 statement 13 closestatement(stmt); 14 } 15 }
上篇文章我们分析完了第6行代码,在第6行处我们创建了一个preparedstatementhandler,我们要接着第8行代码开始分析,也就是创建 statement,先不忙着分析,我们先来回顾一下 ,我们以前是怎么使用jdbc的
jdbc
public class login { /** * 第一步,加载驱动,创建数据库的连接 * 第二步,编写sql * 第三步,需要对sql进行预编译 * 第四步,向sql里面设置参数 * 第五步,执行sql * 第六步,释放资源 * @throws exception */ public static final string url = "jdbc:mysql://localhost:3306/chenhao"; public static final string user = "liulx"; public static final string password = "123456"; public static void main(string[] args) throws exception { login("lucy","123"); } public static void login(string username , string password) throws exception{ connection conn = null; preparedstatement psmt = null; resultset rs = null; try { //加载驱动程序 class.forname("com.mysql.jdbc.driver"); //获得数据库连接 conn = drivermanager.getconnection(url, user, password); //编写sql string sql = "select * from user where name =? and password = ?";//问号相当于一个占位符 //对sql进行预编译 psmt = conn.preparestatement(sql); //设置参数 psmt.setstring(1, username); psmt.setstring(2, password); //执行sql ,返回一个结果集 rs = psmt.executequery(); //输出结果 while(rs.next()){ system.out.println(rs.getstring("user_name")+" 年龄:"+rs.getint("age")); } } catch (exception e) { e.printstacktrace(); }finally{ //释放资源 conn.close(); psmt.close(); rs.close(); } } }
上面代码中注释已经很清楚了,我们来看看mybatis中是怎么和数据库打交道的。
simpleexecutor
private statement preparestatement(statementhandler handler, log statementlog) throws sqlexception { statement stmt; // 获取数据库连接 connection connection = getconnection(statementlog); // 创建 statement, stmt = handler.prepare(connection, transaction.gettimeout()); // 为 statement 设置参数 handler.parameterize(stmt); return stmt; }
在上面的代码中我们终于看到了和jdbc相关的内容了,大概分为下面三个步骤:
- 获取数据库连接
- 创建preparedstatement
- 为preparedstatement设置运行时参数
我们先来看看获取数据库连接,跟进代码看看
baseexecutor
protected connection getconnection(log statementlog) throws sqlexception { //通过transaction来获取connection connection connection = this.transaction.getconnection(); return statementlog.isdebugenabled() ? connectionlogger.newinstance(connection, statementlog, this.querystack) : connection; }
我们看到是通过executor中的transaction属性来获取connection,那我们就先来看看transaction,根据前面的文章中的配置
<
transactionmanager
type="jdbc"/>,
则mybatis会创建一个jdbctransactionfactory.class 实例,executor中的transaction是一个jdbctransaction.class 实例,其实现transaction接口,那我们先来看看transaction
jdbctransaction
我们先来看看其接口transaction
transaction
public interface transaction { //获取数据库连接 connection getconnection() throws sqlexception; //提交事务 void commit() throws sqlexception; //回滚事务 void rollback() throws sqlexception; //关闭事务 void close() throws sqlexception; //获取超时时间 integer gettimeout() throws sqlexception; }
接着我们看看其实现类jdbctransaction
jdbctransaction
public class jdbctransaction implements transaction { private static final log log = logfactory.getlog(jdbctransaction.class); //数据库连接 protected connection connection; //数据源信息 protected datasource datasource; //隔离级别 protected transactionisolationlevel level; //是否为自动提交 protected boolean autocommmit; public jdbctransaction(datasource ds, transactionisolationlevel desiredlevel, boolean desiredautocommit) { datasource = ds; level = desiredlevel; autocommmit = desiredautocommit; } public jdbctransaction(connection connection) { this.connection = connection; } public connection getconnection() throws sqlexception { //如果事务中不存在connection,则获取一个connection并放入connection属性中 //第一次肯定为空 if (connection == null) { openconnection(); } //如果事务中已经存在connection,则直接返回这个connection return connection; } /** * commit()功能 * @throws sqlexception */ public void commit() throws sqlexception { if (connection != null && !connection.getautocommit()) { if (log.isdebugenabled()) { log.debug("committing jdbc connection [" + connection + "]"); } //使用connection的commit() connection.commit(); } } /** * rollback()功能 * @throws sqlexception */ public void rollback() throws sqlexception { if (connection != null && !connection.getautocommit()) { if (log.isdebugenabled()) { log.debug("rolling back jdbc connection [" + connection + "]"); } //使用connection的rollback() connection.rollback(); } } /** * close()功能 * @throws sqlexception */ public void close() throws sqlexception { if (connection != null) { resetautocommit(); if (log.isdebugenabled()) { log.debug("closing jdbc connection [" + connection + "]"); } //使用connection的close() connection.close(); } } protected void openconnection() throws sqlexception { if (log.isdebugenabled()) { log.debug("opening jdbc connection"); } //通过datasource来获取connection,并设置到transaction的connection属性中 connection = datasource.getconnection(); if (level != null) { //通过connection设置事务的隔离级别 connection.settransactionisolation(level.getlevel()); } //设置事务是否自动提交 setdesiredautocommit(autocommmit); } protected void setdesiredautocommit(boolean desiredautocommit) { try { if (this.connection.getautocommit() != desiredautocommit) { if (log.isdebugenabled()) { log.debug("setting autocommit to " + desiredautocommit + " on jdbc connection [" + this.connection + "]"); } //通过connection设置事务是否自动提交 this.connection.setautocommit(desiredautocommit); } } catch (sqlexception var3) { throw new transactionexception("error configuring autocommit. your driver may not support getautocommit() or setautocommit(). requested setting: " + desiredautocommit + ". cause: " + var3, var3); } } }
我们看到jdbctransaction中有一个connection属性和datasource属性,使用connection来进行提交、回滚、关闭等操作,也就是说jdbctransaction其实只是在jdbc的connection上面封装了一下,实际使用的其实还是jdbc的事务。我们看看getconnection()方法
//数据库连接 protected connection connection; //数据源信息 protected datasource datasource; public connection getconnection() throws sqlexception { //如果事务中不存在connection,则获取一个connection并放入connection属性中 //第一次肯定为空 if (connection == null) { openconnection(); } //如果事务中已经存在connection,则直接返回这个connection return connection; } protected void openconnection() throws sqlexception { if (log.isdebugenabled()) { log.debug("opening jdbc connection"); } //通过datasource来获取connection,并设置到transaction的connection属性中 connection = datasource.getconnection(); if (level != null) { //通过connection设置事务的隔离级别 connection.settransactionisolation(level.getlevel()); } //设置事务是否自动提交 setdesiredautocommit(autocommmit); }
先是判断当前事务中是否存在connection,如果存在,则直接返回connection,如果不存在则通过datasource来获取connection,这里我们明白了一点,如果当前事务没有关闭,也就是没有释放connection,那么在同一个transaction中使用的是同一个connection,我们再来想想,transaction是simpleexecutor中的属性,simpleexecutor又是sqlsession中的属性,那我们可以这样说,同一个sqlsession中只有一个simpleexecutor,simpleexecutor中有一个transaction,transaction有一个connection。我们来看看如下例子
public static void main(string[] args) throws ioexception { string resource = "mybatis-config.xml"; inputstream inputstream = resources.getresourceasstream(resource); sqlsessionfactory sqlsessionfactory = new sqlsessionfactorybuilder().build(inputstream); //创建一个sqlsession sqlsession sqlsession = sqlsessionfactory.opensession(); try { employeemapper employeemapper = sqlsession.getmapper(employee.class); usermapper usermapper = sqlsession.getmapper(user.class); list<employee> allemployee = employeemapper.getall(); list<user> alluser = usermapper.getall(); employee employee = employeemapper.getone(); } finally { sqlsession.close(); } }
我们看到同一个sqlsession可以获取多个mapper代理对象,则多个mapper代理对象中的sqlsession引用应该是同一个,那么多个mapper代理对象调用方法应该是同一个connection,直到调用close(),所以说我们的sqlsession是线程不安全的,如果所有的业务都使用一个sqlsession,那connection也是同一个,一个业务执行完了就将其关闭,那其他的业务还没执行完呢。大家明白了吗?我们回归到源码,connection = datasource.getconnection();,最终还是调用datasource来获取连接,那我们是不是要来看看datasource呢?
我们还是从前面的配置文件来看<datasource type="unpooled|pooled">,这里有unpooled和pooled两种datasource,一种是使用连接池,一种是普通的datasource,unpooled将会创将new unpooleddatasource()实例,pooled将会new pooleddatasource()实例,都实现datasource接口,那我们先来看看datasource接口
datasource
public interface datasource extends commondatasource,wrapper { //获取数据库连接 connection getconnection() throws sqlexception; connection getconnection(string username, string password) throws sqlexception; }
很简单,只有一个获取数据库连接的接口,那我们来看看其实现类
unpooleddatasource
unpooleddatasource,从名称上即可知道,该种数据源不具有池化特性。该种数据源每次会返回一个新的数据库连接,而非复用旧的连接。其核心的方法有三个,分别如下:
- initializedriver - 初始化数据库驱动
- dogetconnection - 获取数据连接
- configureconnection - 配置数据库连接
初始化数据库驱动
看下我们上面使用jdbc的例子,在执行 sql 之前,通常都是先获取数据库连接。一般步骤都是加载数据库驱动,然后通过 drivermanager 获取数据库连接。unpooleddatasource 也是使用 jdbc 访问数据库的,因此它获取数据库连接的过程一样
unpooleddatasource
public class unpooleddatasource implements datasource { private classloader driverclassloader; private properties driverproperties; private static map<string, driver> registereddrivers = new concurrenthashmap(); private string driver; private string url; private string username; private string password; private boolean autocommit; private integer defaulttransactionisolationlevel; public unpooleddatasource() { } public unpooleddatasource(string driver, string url, string username, string password) { this.driver = driver; this.url = url; this.username = username; this.password = password; } private synchronized void initializedriver() throws sqlexception { // 检测当前 driver 对应的驱动实例是否已经注册 if (!registereddrivers.containskey(driver)) { class<?> drivertype; try { // 加载驱动类型 if (driverclassloader != null) { // 使用 driverclassloader 加载驱动 drivertype = class.forname(driver, true, driverclassloader); } else { // 通过其他 classloader 加载驱动 drivertype = resources.classforname(driver); } // 通过反射创建驱动实例 driver driverinstance = (driver) drivertype.newinstance(); /* * 注册驱动,注意这里是将 driver 代理类 driverproxy 对象注册到 drivermanager 中的,而非 driver 对象本身。 */ drivermanager.registerdriver(new driverproxy(driverinstance)); // 缓存驱动类名和实例,防止多次注册 registereddrivers.put(driver, driverinstance); } catch (exception e) { throw new sqlexception("error setting driver on unpooleddatasource. cause: " + e); } } } //略... } //drivermanager private final static copyonwritearraylist<driverinfo> registereddrivers = new copyonwritearraylist<driverinfo>(); public static synchronized void registerdriver(java.sql.driver driver) throws sqlexception { if(driver != null) { registereddrivers.addifabsent(new driverinfo(driver)); } else { // this is for compatibility with the original drivermanager throw new nullpointerexception(); } }
通过反射机制加载驱动driver,并将其注册到drivermanager中的一个常量集合中,供后面获取连接时使用,为什么这里是一个list呢?我们实际开发中有可能使用到了多种数据库类型,如mysql、oracle等,其驱动都是不同的,不同的数据源获取连接时使用的是不同的驱动。
在我们使用jdbc的时候,也没有通过drivermanager.registerdriver(new driverproxy(driverinstance));去注册driver啊,如果我们使用的是mysql数据源,那我们来看class.forname("com.mysql.jdbc.driver");这句代码发生了什么
class.forname主要是做了什么呢?它主要是要求jvm查找并装载指定的类。这样我们的类com.mysql.jdbc.driver就被装载进来了。而且在类被装载进jvm的时候,它的静态方法就会被执行。我们来看com.mysql.jdbc.driver的实现代码。在它的实现里有这么一段代码:
static { try { java.sql.drivermanager.registerdriver(new driver()); } catch (sqlexception e) { throw new runtimeexception("can't register driver!"); } }
很明显,这里使用了drivermanager并将该类给注册上去了。所以,对于任何实现前面driver接口的类,只要在他们被装载进jvm的时候注册drivermanager就可以实现被后续程序使用。
作为那些被加载的driver实现,他们本身在被装载时会在执行的static代码段里通过调用drivermanager.registerdriver()来把自身注册到drivermanager的registereddrivers列表中。这样后面就可以通过得到的driver来取得连接了。
获取数据库连接
在上面例子中使用 jdbc 时,我们都是通过 drivermanager 的接口方法获取数据库连接。我们来看看unpooleddatasource是如何获取的。
unpooleddatasource
public connection getconnection() throws sqlexception { return dogetconnection(username, password); } private connection dogetconnection(string username, string password) throws sqlexception { properties props = new properties(); if (driverproperties != null) { props.putall(driverproperties); } if (username != null) { // 存储 user 配置 props.setproperty("user", username); } if (password != null) { // 存储 password 配置 props.setproperty("password", password); } // 调用重载方法 return dogetconnection(props); } private connection dogetconnection(properties properties) throws sqlexception { // 初始化驱动,我们上一节已经讲过了,只用初始化一次 initializedriver(); // 获取连接 connection connection = drivermanager.getconnection(url, properties); // 配置连接,包括自动提交以及事务等级 configureconnection(connection); return connection; } private void configureconnection(connection conn) throws sqlexception { if (autocommit != null && autocommit != conn.getautocommit()) { // 设置自动提交 conn.setautocommit(autocommit); } if (defaulttransactionisolationlevel != null) { // 设置事务隔离级别 conn.settransactionisolation(defaulttransactionisolationlevel); } }
上面方法将一些配置信息放入到 properties 对象中,然后将数据库连接和 properties 对象传给 drivermanager 的 getconnection 方法即可获取到数据库连接。我们来看看是怎么获取数据库连接的
private static connection getconnection(string url, java.util.properties info, class<?> caller) throws sqlexception { // 获取类加载器 classloader callercl = caller != null ? caller.getclassloader() : null; synchronized(drivermanager.class) { if (callercl == null) { callercl = thread.currentthread().getcontextclassloader(); } } // 此处省略部分代码 // 这里遍历的是在registerdriver(driver driver)方法中注册的驱动对象 // 每个driverinfo包含了驱动对象和其信息 for(driverinfo adriver : registereddrivers) { // 判断是否为当前线程类加载器加载的驱动类 if(isdriverallowed(adriver.driver, callercl)) { try { println("trying " + adriver.driver.getclass().getname()); // 获取连接对象,这里调用了driver的父类的方法 // 如果这里有多个driverinfo,比喻mysql和oracle的driver都注册registereddrivers了 // 这里所有的driver都会尝试使用url和info去连接,哪个连接上了就返回 // 会不会所有的都会连接上呢?不会,因为url的写法不同,不同的driver会判断url是否适合当前驱动 connection con = adriver.driver.connect(url, info); if (con != null) { // 打印连接成功信息 println("getconnection returning " + adriver.driver.getclass().getname()); // 返回连接对像 return (con); } } catch (sqlexception ex) { if (reason == null) { reason = ex; } } } else { println(" skipping: " + adriver.getclass().getname()); } } }
代码中循环所有注册的驱动,然后通过驱动进行连接,所有的驱动都会尝试连接,但是不同的驱动,连接的url是不同的,如mysql的url是jdbc:mysql://localhost:3306/chenhao,以jdbc:mysql://开头,则其mysql的驱动肯定会判断获取连接的url符合,oracle的也类似,我们来看看mysql的驱动获取连接
由于篇幅原因,我这里就不分析了,大家有兴趣的可以看看,最后由url对应的驱动获取到connection返回,好了我们再来看看下一种datasource
pooleddatasource
pooleddatasource 内部实现了连接池功能,用于复用数据库连接。因此,从效率上来说,pooleddatasource 要高于 unpooleddatasource。但是最终获取connection还是通过unpooleddatasource,只不过pooleddatasource 提供一个存储connection的功能。
辅助类介绍
pooleddatasource 需要借助两个辅助类帮其完成功能,这两个辅助类分别是 poolstate 和 pooledconnection。poolstate 用于记录连接池运行时的状态,比如连接获取次数,无效连接数量等。同时 poolstate 内部定义了两个 pooledconnection 集合,用于存储空闲连接和活跃连接。pooledconnection 内部定义了一个 connection 类型的变量,用于指向真实的数据库连接。以及一个 connection 的代理类,用于对部分方法调用进行拦截。至于为什么要拦截,随后将进行分析。除此之外,pooledconnection 内部也定义了一些字段,用于记录数据库连接的一些运行时状态。接下来,我们来看一下 pooledconnection 的定义。
pooledconnection
class pooledconnection implements invocationhandler { private static final string close = "close"; private static final class<?>[] ifaces = new class<?>[]{connection.class}; private final int hashcode; private final pooleddatasource datasource; // 真实的数据库连接 private final connection realconnection; // 数据库连接代理 private final connection proxyconnection; // 从连接池中取出连接时的时间戳 private long checkouttimestamp; // 数据库连接创建时间 private long createdtimestamp; // 数据库连接最后使用时间 private long lastusedtimestamp; // connectiontypecode = (url + username + password).hashcode() private int connectiontypecode; // 表示连接是否有效 private boolean valid; public pooledconnection(connection connection, pooleddatasource datasource) { this.hashcode = connection.hashcode(); this.realconnection = connection; this.datasource = datasource; this.createdtimestamp = system.currenttimemillis(); this.lastusedtimestamp = system.currenttimemillis(); this.valid = true; // 创建 connection 的代理类对象 this.proxyconnection = (connection) proxy.newproxyinstance(connection.class.getclassloader(), ifaces, this); } @override public object invoke(object proxy, method method, object[] args) throws throwable {...} // 省略部分代码 }
下面再来看看 poolstate 的定义。
poolstate
public class poolstate { protected pooleddatasource datasource; // 空闲连接列表 protected final list<pooledconnection> idleconnections = new arraylist<pooledconnection>(); // 活跃连接列表 protected final list<pooledconnection> activeconnections = new arraylist<pooledconnection>(); // 从连接池中获取连接的次数 protected long requestcount = 0; // 请求连接总耗时(单位:毫秒) protected long accumulatedrequesttime = 0; // 连接执行时间总耗时 protected long accumulatedcheckouttime = 0; // 执行时间超时的连接数 protected long claimedoverdueconnectioncount = 0; // 超时时间累加值 protected long accumulatedcheckouttimeofoverdueconnections = 0; // 等待时间累加值 protected long accumulatedwaittime = 0; // 等待次数 protected long hadtowaitcount = 0; // 无效连接数 protected long badconnectioncount = 0; }
大家记住上面的空闲连接列表和活跃连接列表
获取连接
前面已经说过,pooleddatasource 会将用过的连接进行回收,以便可以复用连接。因此从 pooleddatasource 获取连接时,如果空闲链接列表里有连接时,可直接取用。那如果没有空闲连接怎么办呢?此时有两种解决办法,要么创建新连接,要么等待其他连接完成任务。
pooleddatasource
public class pooleddatasource implements datasource { private static final log log = logfactory.getlog(pooleddatasource.class); //这里有辅助类poolstate private final poolstate state = new poolstate(this); //还有一个unpooleddatasource属性,其实真正获取connection是由unpooleddatasource来完成的 private final unpooleddatasource datasource; protected int poolmaximumactiveconnections = 10; protected int poolmaximumidleconnections = 5; protected int poolmaximumcheckouttime = 20000; protected int pooltimetowait = 20000; protected string poolpingquery = "no ping query set"; protected boolean poolpingenabled = false; protected int poolpingconnectionsnotusedfor = 0; private int expectedconnectiontypecode; public pooleddatasource() { this.datasource = new unpooleddatasource(); } public pooleddatasource(string driver, string url, string username, string password) { //构造器中创建unpooleddatasource对象 this.datasource = new unpooleddatasource(driver, url, username, password); } public connection getconnection() throws sqlexception { return this.popconnection(this.datasource.getusername(), this.datasource.getpassword()).getproxyconnection(); } private pooledconnection popconnection(string username, string password) throws sqlexception { boolean countedwait = false; pooledconnection conn = null; long t = system.currenttimemillis(); int localbadconnectioncount = 0; while (conn == null) { synchronized (state) { // 检测空闲连接集合(idleconnections)是否为空 if (!state.idleconnections.isempty()) { // idleconnections 不为空,表示有空闲连接可以使用,直接从空闲连接集合中取出一个连接 conn = state.idleconnections.remove(0); } else { /* * 暂无空闲连接可用,但如果活跃连接数还未超出限制 *(poolmaximumactiveconnections),则可创建新的连接 */ if (state.activeconnections.size() < poolmaximumactiveconnections) { // 创建新连接,看到没,还是通过datasource获取连接,也就是unpooleddatasource获取连接 conn = new pooledconnection(datasource.getconnection(), this); } else { // 连接池已满,不能创建新连接 // 取出运行时间最长的连接 pooledconnection oldestactiveconnection = state.activeconnections.get(0); // 获取运行时长 long longestcheckouttime = oldestactiveconnection.getcheckouttime(); // 检测运行时长是否超出限制,即超时 if (longestcheckouttime > poolmaximumcheckouttime) { // 累加超时相关的统计字段 state.claimedoverdueconnectioncount++; state.accumulatedcheckouttimeofoverdueconnections += longestcheckouttime; state.accumulatedcheckouttime += longestcheckouttime; // 从活跃连接集合中移除超时连接 state.activeconnections.remove(oldestactiveconnection); // 若连接未设置自动提交,此处进行回滚操作 if (!oldestactiveconnection.getrealconnection().getautocommit()) { try { oldestactiveconnection.getrealconnection().rollback(); } catch (sqlexception e) {...} } /* * 创建一个新的 pooledconnection,注意, * 此处复用 oldestactiveconnection 的 realconnection 变量 */ conn = new pooledconnection(oldestactiveconnection.getrealconnection(), this); /* * 复用 oldestactiveconnection 的一些信息,注意 pooledconnection 中的 * createdtimestamp 用于记录 connection 的创建时间,而非 pooledconnection * 的创建时间。所以这里要复用原连接的时间信息。 */ conn.setcreatedtimestamp(oldestactiveconnection.getcreatedtimestamp()); conn.setlastusedtimestamp(oldestactiveconnection.getlastusedtimestamp()); // 设置连接为无效状态 oldestactiveconnection.invalidate(); } else {// 运行时间最长的连接并未超时 try { if (!countedwait) { state.hadtowaitcount++; countedwait = true; } long wt = system.currenttimemillis(); // 当前线程进入等待状态 state.wait(pooltimetowait); state.accumulatedwaittime += system.currenttimemillis() - wt; } catch (interruptedexception e) { break; } } } } if (conn != null) { if (conn.isvalid()) { if (!conn.getrealconnection().getautocommit()) { // 进行回滚操作 conn.getrealconnection().rollback(); } conn.setconnectiontypecode(assembleconnectiontypecode(datasource.geturl(), username, password)); // 设置统计字段 conn.setcheckouttimestamp(system.currenttimemillis()); conn.setlastusedtimestamp(system.currenttimemillis()); state.activeconnections.add(conn); state.requestcount++; state.accumulatedrequesttime += system.currenttimemillis() - t; } else { // 连接无效,此时累加无效连接相关的统计字段 state.badconnectioncount++; localbadconnectioncount++; conn = null; if (localbadconnectioncount > (poolmaximumidleconnections + poolmaximumlocalbadconnectiontolerance)) { throw new sqlexception(...); } } } } } if (conn == null) { throw new sqlexception(...); } return conn; } }
从连接池中获取连接首先会遇到两种情况:
- 连接池中有空闲连接
- 连接池中无空闲连接
对于第一种情况,把连接取出返回即可。对于第二种情况,则要进行细分,会有如下的情况。
- 活跃连接数没有超出最大活跃连接数
- 活跃连接数超出最大活跃连接数
对于上面两种情况,第一种情况比较好处理,直接创建新的连接即可。至于第二种情况,需要再次进行细分。
- 活跃连接的运行时间超出限制,即超时了
- 活跃连接未超时
对于第一种情况,我们直接将超时连接强行中断,并进行回滚,然后复用部分字段重新创建 pooledconnection 即可。对于第二种情况,目前没有更好的处理方式了,只能等待了。
回收连接
相比于获取连接,回收连接的逻辑要简单的多。回收连接成功与否只取决于空闲连接集合的状态,所需处理情况很少,因此比较简单。
我们还是来看看
public connection getconnection() throws sqlexception { return this.popconnection(this.datasource.getusername(), this.datasource.getpassword()).getproxyconnection(); }
返回的是pooledconnection的一个代理类,为什么不直接使用pooledconnection的realconnection呢?我们可以看下pooledconnection这个类
class pooledconnection implements invocationhandler {
很熟悉是吧,标准的代理类用法,看下其invoke方法
pooledconnection
@override public object invoke(object proxy, method method, object[] args) throws throwable { string methodname = method.getname(); // 重点在这里,如果调用了其close方法,则实际执行的是将连接放回连接池的操作 if (close.hashcode() == methodname.hashcode() && close.equals(methodname)) { datasource.pushconnection(this); return null; } else { try { if (!object.class.equals(method.getdeclaringclass())) { // issue #579 tostring() should never fail // throw an sqlexception instead of a runtime checkconnection(); } // 其他的操作都交给realconnection执行 return method.invoke(realconnection, args); } catch (throwable t) { throw exceptionutil.unwrapthrowable(t); } } }
那我们来看看pushconnection做了什么
protected void pushconnection(pooledconnection conn) throws sqlexception { synchronized (state) { // 从活跃连接池中移除连接 state.activeconnections.remove(conn); if (conn.isvalid()) { // 空闲连接集合未满 if (state.idleconnections.size() < poolmaximumidleconnections && conn.getconnectiontypecode() == expectedconnectiontypecode) { state.accumulatedcheckouttime += conn.getcheckouttime(); // 回滚未提交的事务 if (!conn.getrealconnection().getautocommit()) { conn.getrealconnection().rollback(); } // 创建新的 pooledconnection pooledconnection newconn = new pooledconnection(conn.getrealconnection(), this); state.idleconnections.add(newconn); // 复用时间信息 newconn.setcreatedtimestamp(conn.getcreatedtimestamp()); newconn.setlastusedtimestamp(conn.getlastusedtimestamp()); // 将原连接置为无效状态 conn.invalidate(); // 通知等待的线程 state.notifyall(); } else {// 空闲连接集合已满 state.accumulatedcheckouttime += conn.getcheckouttime(); // 回滚未提交的事务 if (!conn.getrealconnection().getautocommit()) { conn.getrealconnection().rollback(); } // 关闭数据库连接 conn.getrealconnection().close(); conn.invalidate(); } } else { state.badconnectioncount++; } } }
先将连接从活跃连接集合中移除,如果空闲集合未满,此时复用原连接的字段信息创建新的连接,并将其放入空闲集合中即可;若空闲集合已满,此时无需回收连接,直接关闭即可。
连接池总觉得很神秘,但仔细分析完其代码之后,也就没那么神秘了,就是将连接使用完之后放到一个集合中,下面再获取连接的时候首先从这个集合中获取。 还有pooledconnection的代理模式的使用,值得我们学习
好了,我们已经获取到了数据库连接,接下来要创建preparestatement了,我们上面jdbc的例子是怎么获取的? psmt = conn.preparestatement(sql);,直接通过connection来获取,并且把sql传进去了,我们看看mybaits中是怎么创建preparestatement的
创建preparedstatement
preparedstatementhandler
stmt = handler.prepare(connection, transaction.gettimeout()); public statement prepare(connection connection, integer transactiontimeout) throws sqlexception { statement statement = null; try { // 创建 statement statement = instantiatestatement(connection); // 设置超时和 fetchsize setstatementtimeout(statement, transactiontimeout); setfetchsize(statement); return statement; } catch (sqlexception e) { closestatement(statement); throw e; } catch (exception e) { closestatement(statement); throw new executorexception("error preparing statement. cause: " + e, e); } } protected statement instantiatestatement(connection connection) throws sqlexception { //获取sql字符串,比如"select * from user where id= ?" string sql = boundsql.getsql(); // 根据条件调用不同的 preparestatement 方法创建 preparedstatement if (mappedstatement.getkeygenerator() instanceof jdbc3keygenerator) { string[] keycolumnnames = mappedstatement.getkeycolumns(); if (keycolumnnames == null) { //通过connection获取statement,将sql语句传进去 return connection.preparestatement(sql, preparedstatement.return_generated_keys); } else { return connection.preparestatement(sql, keycolumnnames); } } else if (mappedstatement.getresultsettype() != null) { return connection.preparestatement(sql, mappedstatement.getresultsettype().getvalue(), resultset.concur_read_only); } else { return connection.preparestatement(sql); } }
看到没和jdbc的形式一模一样,我们具体来看看connection.preparestatement做了什么
1 public preparedstatement preparestatement(string sql, int resultsettype, int resultsetconcurrency) throws sqlexception { 2 3 boolean canserverprepare = true; 4 5 string nativesql = getprocessescapecodesforprepstmts() ? nativesql(sql) : sql; 6 7 if (this.useserverpreparedstmts && getemulateunsupportedpstmts()) { 8 canserverprepare = canhandleasserverpreparedstatement(nativesql); 9 } 10 11 if (this.useserverpreparedstmts && getemulateunsupportedpstmts()) { 12 canserverprepare = canhandleasserverpreparedstatement(nativesql); 13 } 14 15 if (this.useserverpreparedstmts && canserverprepare) { 16 if (this.getcachepreparedstatements()) { 17 ...... 18 } else { 19 try { 20 //这里使用的是serverpreparedstatement创建preparedstatement 21 pstmt = serverpreparedstatement.getinstance(getmultihostsafeproxy(), nativesql, this.database, resultsettype, resultsetconcurrency); 22 23 pstmt.setresultsettype(resultsettype); 24 pstmt.setresultsetconcurrency(resultsetconcurrency); 25 } catch (sqlexception sqlex) { 26 // punt, if necessary 27 if (getemulateunsupportedpstmts()) { 28 pstmt = (preparedstatement) clientpreparestatement(nativesql, resultsettype, resultsetconcurrency, false); 29 } else { 30 throw sqlex; 31 } 32 } 33 } 34 } else { 35 pstmt = (preparedstatement) clientpreparestatement(nativesql, resultsettype, resultsetconcurrency, false); 36 } 37 }
我们只用看最关键的第21行代码,使用serverpreparedstatement的getinstance返回一个preparedstatement,其实本质上serverpreparedstatement继承了preparedstatement对象,我们看看其构造方法
protected serverpreparedstatement(connectionimpl conn, string sql, string catalog, int resultsettype, int resultsetconcurrency) throws sqlexception { //略... try { this.serverprepare(sql); } catch (sqlexception var10) { this.realclose(false, true); throw var10; } catch (exception var11) { this.realclose(false, true); sqlexception sqlex = sqlerror.createsqlexception(var11.tostring(), "s1000", this.getexceptioninterceptor()); sqlex.initcause(var11); throw sqlex; } //略... }
继续调用this.serverprepare(sql);
public class serverpreparedstatement extends preparedstatement { //存放运行时参数的数组 private serverpreparedstatement.bindvalue[] parameterbindings; //服务器预编译好的sql语句返回的serverstatementid private long serverstatementid; private void serverprepare(string sql) throws sqlexception { synchronized(this.connection.getmutex()) { mysqlio mysql = this.connection.getio(); try { //向sql服务器发送了一条prepare指令 buffer prepareresultpacket = mysql.sendcommand(mysqldefs.com_prepare, sql, (buffer)null, false, characterencoding, 0); //记录下了预编译好的sql语句所对应的serverstatementid this.serverstatementid = prepareresultpacket.readlong(); this.fieldcount = prepareresultpacket.readint(); //获取参数个数,比喻 select * from user where id= ?and name = ?,其中有两个?,则这里返回的参数个数应该为2 this.parametercount = prepareresultpacket.readint(); this.parameterbindings = new serverpreparedstatement.bindvalue[this.parametercount]; for(int i = 0; i < this.parametercount; ++i) { //根据参数个数,初始化数组 this.parameterbindings[i] = new serverpreparedstatement.bindvalue(); } } catch (sqlexception var16) { throw sqlex; } finally { this.connection.getio().clearinputstream(); } } } }
serverpreparedstatement继承preparedstatement,serverpreparedstatement初始化的时候就向sql服务器发送了一条prepare指令,把sql语句传到mysql服务器,如select * from user where id= ?and name = ?,mysql服务器会对sql进行编译,并保存在服务器,返回预编译语句对应的id,并保存在
serverpreparedstatement中,同时创建bindvalue[] parameterbindings数组,后面设置参数就直接添加到此数组中。好了,此时我们创建了一个serverpreparedstatement并返回,下面就是设置运行时参数了
设置运行时参数到 sql 中
我们已经获取到了preparedstatement,接下来就是将运行时参数设置到preparedstatement中,如下代码
handler.parameterize(stmt);
jdbc是怎么设置的呢?我们看看上面的例子,很简单吧
psmt = conn.preparestatement(sql); //设置参数 psmt.setstring(1, username); psmt.setstring(2, password);
我们来看看parameterize方法
public void parameterize(statement statement) throws sqlexception { // 通过参数处理器 parameterhandler 设置运行时参数到 preparedstatement 中 parameterhandler.setparameters((preparedstatement) statement); } public class defaultparameterhandler implements parameterhandler { private final typehandlerregistry typehandlerregistry; private final mappedstatement mappedstatement; private final object parameterobject; private final boundsql boundsql; private final configuration configuration; public void setparameters(preparedstatement ps) { /* * 从 boundsql 中获取 parametermapping 列表,每个 parametermapping 与原始 sql 中的 #{xxx} 占位符一一对应 */ list<parametermapping> parametermappings = boundsql.getparametermappings(); if (parametermappings != null) { for (int i = 0; i < parametermappings.size(); i++) { parametermapping parametermapping = parametermappings.get(i); if (parametermapping.getmode() != parametermode.out) { object value; // 获取属性名 string propertyname = parametermapping.getproperty(); if (boundsql.hasadditionalparameter(propertyname)) { value = boundsql.getadditionalparameter(propertyname); } else if (parameterobject == null) { value = null; } else if (typehandlerregistry.hastypehandler(parameterobject.getclass())) { value = parameterobject; } else { // 为用户传入的参数 parameterobject 创建元信息对象 metaobject metaobject = configuration.newmetaobject(parameterobject); // 从用户传入的参数中获取 propertyname 对应的值 value = metaobject.getvalue(propertyname); } typehandler typehandler = parametermapping.gettypehandler(); jdbctype jdbctype = parametermapping.getjdbctype(); if (value == null && jdbctype == null) { jdbctype = configuration.getjdbctypefornull(); } try { // 由类型处理器 typehandler 向 parameterhandler 设置参数 typehandler.setparameter(ps, i + 1, value, jdbctype); } catch (typeexception e) { throw new typeexception(...); } catch (sqlexception e) { throw new typeexception(...); } } } } } }
首先从boundsql中获取parametermappings 集合,这块大家可以看看我前面的文章,然后遍历获取 parametermapping中的propertyname ,如#{name} 中的name,然后从运行时参数parameterobject中获取name对应的参数值,最后设置到preparedstatement 中,我们主要来看是如何设置参数的。也就是
typehandler.setparameter(ps, i + 1, value, jdbctype);,这句代码最终会向我们例子中一样执行,如下
public void setnonnullparameter(preparedstatement ps, int i, string parameter, jdbctype jdbctype) throws sqlexception { ps.setstring(i, parameter); }
还记得我们的preparedstatement是什么吗?是serverpreparedstatement,那我们就来看看serverpreparedstatement的setstring方法
public void setstring(int parameterindex, string x) throws sqlexception { this.checkclosed(); if (x == null) { this.setnull(parameterindex, 1); } else { //根据参数下标从parameterbindings数组总获取bindvalue serverpreparedstatement.bindvalue binding = this.getbinding(parameterindex, false); this.settype(binding, this.stringtypecode); //设置参数值 binding.value = x; binding.isnull = false; binding.islongdata = false; } } protected serverpreparedstatement.bindvalue getbinding(int parameterindex, boolean forlongdata) throws sqlexception { this.checkclosed(); if (this.parameterbindings.length == 0) { throw sqlerror.createsqlexception(messages.getstring("serverpreparedstatement.8"), "s1009", this.getexceptioninterceptor()); } else { --parameterindex; if (parameterindex >= 0 && parameterindex < this.parameterbindings.length) { if (this.parameterbindings[parameterindex] == null) { this.parameterbindings[parameterindex] = new serverpreparedstatement.bindvalue(); } else if (this.parameterbindings[parameterindex].islongdata && !forlongdata) { this.detectedlongparameterswitch = true; } this.parameterbindings[parameterindex].isset = true; this.parameterbindings[parameterindex].boundbeforeexecutionnum = (long)this.numberofexecutions; //根据参数下标从parameterbindings数组总获取bindvalue return this.parameterbindings[parameterindex]; } else { throw sqlerror.createsqlexception(messages.getstring("serverpreparedstatement.9") + (parameterindex + 1) + messages.getstring("serverpreparedstatement.10") + this.parameterbindings.length, "s1009", this.getexceptioninterceptor()); } } }
就是根据参数下标从serverpreparedstatement的参数数组parameterbindings中获取bindvalue对象,然后设置值,好了现在serverpreparedstatement包含了预编译sql语句的id和参数数组,最后一步便是执行sql了。
执行查询
执行查询操作就是我们文章开头的最后一行代码,如下
return handler.<e>query(stmt, resulthandler);
我们来看看query是怎么做的
public <e> list<e> query(statement statement, resulthandler resulthandler) throws sqlexception { preparedstatement ps = (preparedstatement)statement; //直接执行serverpreparedstatement的execute方法 ps.execute(); return this.resultsethandler.handleresultsets(ps); } public boolean execute() throws sqlexception { this.checkclosed(); connectionimpl locallyscopedconn = this.connection; if (!this.checkreadonlysafestatement()) { throw sqlerror.createsqlexception(messages.getstring("preparedstatement.20") + messages.getstring("preparedstatement.21"), "s1009", this.getexceptioninterceptor()); } else { resultsetinternalmethods rs = null; cachedresultsetmetadata cachedmetadata = null; synchronized(locallyscopedconn.getmutex()) { //略.... rs = this.executeinternal(rowlimit, sendpacket, dostreaming, this.firstcharofstmt == 's', metadatafromcache, false); //略.... } return rs != null && rs.reallyresult(); } }
省略了很多代码,只看最关键的executeinternal
serverpreparedstatement
protected resultsetinternalmethods executeinternal(int maxrowstoretrieve, buffer sendpacket, boolean createstreamingresultset, boolean queryisselectonly, field[] metadatafromcache, boolean isbatch) throws sqlexception { try { return this.serverexecute(maxrowstoretrieve, createstreamingresultset, metadatafromcache); } catch (sqlexception var11) { throw sqlex; } } private resultsetinternalmethods serverexecute(int maxrowstoretrieve, boolean createstreamingresultset, field[] metadatafromcache) throws sqlexception { synchronized(this.connection.getmutex()) { //略.... mysqlio mysql = this.connection.getio(); buffer packet = mysql.getsharedsendpacket(); packet.clear(); packet.writebyte((byte)mysqldefs.com_execute); //将该语句对应的id写入数据包 packet.writelong(this.serverstatementid); int i; //将对应的参数写入数据包 for(i = 0; i < this.parametercount; ++i) { if (!this.parameterbindings[i].islongdata) { if (!this.parameterbindings[i].isnull) { this.storebinding(packet, this.parameterbindings[i], mysql); } else { nullbitsbuffer[i / 8] = (byte)(nullbitsbuffer[i / 8] | 1 << (i & 7)); } } } //发送数据包,表示执行id对应的预编译sql buffer resultpacket = mysql.sendcommand(mysqldefs.com_execute, (string)null, packet, false, (string)null, 0); //略.... resultsetimpl rs = mysql.readallresults(this, this.resultsettype, resultpacket, true, (long)this.fieldcount, metadatafromcache); //返回结果 return rs; } }
serverpreparedstatement在记录下serverstatementid后,对于相同sql模板的操作,每次只是发送serverstatementid和对应的参数,省去了编译sql的过程。 至此我们的已经从数据库拿到了查询结果,但是结果是resultsetimpl类型,我们还需要将返回结果转化成我们的java对象呢,留在下一篇来讲吧