深入理解MyBatis-Executor执行体系
深入理解MyBatis-Executor执行体系
序言
MyBatis做为一个半ORM框架,现阶段在国内可谓如日中天,不论是前几年的SSM框架体系还是如今的微服务,企业级系统与数据库的交互基本使用的都是MyBatis。MyBatis作为一款轻量级框架,其架构设计与代码质量上,都十分的优秀。学习难度与成本均不算高,对于想学习源代码的同学来说,选择MyBatis作为入门十分适合。出于兴趣与爱好,同时也作为学习,提升自己。笔者将开始以深入理解MyBatis为主题的连载,经过一段时间的学习和规划,笔者将关于MyBatis的系列文章分为8个章节,作为对这段时间的学习与源码阅读的总结。若有感兴趣的同学看到这里,欢迎留言讨论。如果文章中有任何错误的地方,也感谢同学们帮忙指正。
概要
- 回顾JDBC
- MyBatis执行过程
- Executor执行器体系
1、回顾JDBC
1、概述
JDBC(Java Data Base Connectivity)是一种用于执行SQL语句的Java API,从根本上来说JDBC是一种规范,为多种关系数据库提供统一的访问接口,允许便携式访问底层数据库。对于从业Java的同学来说,都不会对JDBC感到陌生。
2、执行流程
以往不论是学习还是使用JDBC时,我们的代码基本都根据以下流程来完成一次与数据库的交互。
- 通过DriverManager获取数据库连接
- 预编译SQL语句,进行SQL参数填充
- 执行PrepareStatement并获取执行结果
- 将SQL的执行结果解析成我们需要的Bean对象
下面的代码演示了使用JDBC执行一次简单的查询操作
/**
* @author Emove
* @date 2020/5/30
*/
public class JdbcTest {
private final static String URL = "jdbc:mysql://localhost:3306/test";
private final static String USER = "test";
private final static String PWD = "123456";
private Connection connection;
@Before
public void connect() throws SQLException {
connection = DriverManager.getConnection(URL, USER, PWD);
}
@After
public void close() throws SQLException {
connection.close();
}
@Test
public void run() throws SQLException {
String sql = "SELECT * FROM users WHERE `name`=?";
PreparedStatement psm = connection.prepareStatement(sql);
psm.setString(1, "Emove");
psm.execute();
ResultSet resultSet = psm.getResultSet();
while (resultSet.next()) {
System.out.println(resultSet.getString(1));
System.out.println(resultSet.getString(2));
}
resultSet.close();
psm.close();
}
}
但是如果每次使用数据库,都需要使用这样的一段代码,自己获取一次数据库连接,拼接SQL、设置参数、处理结果集、再关闭连接。如果参数一多,整个流程下来,时间与人工成功都非常的高,也非常的繁琐,特别是在复杂SQL的情况下,同时使代码看起来十分的臃肿。MyBatis的出现,为的就是解决上述的问题,MyBatis通过封装上述的所有步骤,允许程序员自己编写SQL,配置好参数映射等,即可随时完成与数据库的交互,将程序员从繁琐的查询与处理SQL中解放出来。
3、JDBC中的三种执行器
在Java的sql包中,有三个SQL执行器接口Statement(简单执行器)、PreparedStatement(预处理执行器)和CallableStatement(存储过程执行器)。
- Statement:执行静态SQL、批处理和设置加载行数
- PreparedStatement:设置预编译参数,用于实现防止SQL注入
- Callable Statement:与存储过程相关,可设置出参,读取出参
2、MyBatis执行过程
上图为MyBatis查询执行过程图,图中包含一次查询过程中,MyBatis的逻辑流程及分支。将以上流程进行划分,可大致分为以下三个部分:
- 从查询节点开始,到会话查询节点为止,为dao层与MyBatis交互逻辑,在此过程中,会获取一个SqlSession对象,该对象作用域为一次查询会话。
- 从会话查询节点开始,到查询数据库节点为止,为MyBatis的执行器(Executor)逻辑执行过程。在这段流程中,主要为MyBatis以执行的statementid(即执行的sql映射ID)为目标对象进行的缓存、事务管理。(本文主要就是讲解Executor的执行体系,至于具体的逻辑代码,下文会有源码分析)
- 从查询数据库节点开始,到剩余的其他分支。为第一小节中,对JDBC的执行流程的封装及处理。
下图是MyBatis查询的一次执行流程
SqlSession
SqlSession是MyBatis中的最重要的构建之一,是MyBatis对外提供的门面,用于提供增删改查、事务管理等功能,类似于JDBC里的Connection。它是应用层及MyBatis与DB之间执行交互操作的一个单线程对象,也是MyBatis执行持久化操作的关键对象。但由于SqlSession的实例不是线程安全的,因此是不能被共享的,所以它的最佳的作用域是一次请求或方法作用域。
在MyBatis中,DefaultSqlSession继承了SqlSession接口,是SqlSession的默认实现,在该实现类中,包含了两个重要对象Configuration和Executor。Configuration也被称为MyBatis的大管家,贯穿了MyBatis的一次执行流程。而Executor则是后续查询逻辑操作的主要载体对象之一,下面将主要分析Executor的执行体系。
/**
* The default implementation for {@link SqlSession}.
* Note that this class is not Thread-Safe.
*
* @author Clinton Begin
*/
public class DefaultSqlSession implements SqlSession {
private final Configuration configuration;
private final Executor executor;
private final boolean autoCommit;
private boolean dirty;
private List<Cursor<?>> cursorList;
public DefaultSqlSession(Configuration configuration, Executor executor, boolean autoCommit) {
this.configuration = configuration;
this.executor = executor;
this.dirty = false;
this.autoCommit = autoCommit;
}
public DefaultSqlSession(Configuration configuration, Executor executor) {
this(configuration, executor, false);
}
...
}
3、Executor执行器体系
在前文的简单介绍中,我们已经知道,SqlSession对象只是一个门面,其后续的执行工作都交给了Executor。
SqlSession作为MyBatis的门面,用于提供MyBatis对应用提供的操作接口,其中除了基本的增删改查之外,还有事务相关的提交回滚,会话关闭,清除缓存,获取configuration和获取数据连接等辅助API,主要是方便我们对方法的调用。在SqlSession对外提供的这些接口中,大部分都是Executor在提供支持。下面我们来看下Executor接口定义的方法。
可以看到,Executor中定义的方法大致都是与数据交互相关的方法,大致可以分为以下三类:
- 增删改查:增删改都包含在update方法中统一处理
- 事务相关:获取事务管理、提交回滚等
- 缓存相关:其中包含了MyBatis提供的一级、二级缓存等。
接下来,我们简单看下Executor的继承关系:
MyBatis中,Executor提供了四个实现,其中ClosedExecutor为一个内部类,只实现了isClosed方法,忽略不计。先简单介绍下各个处理器的功能:
- BaseExecutor:基础执行器,是一个抽象类,提供了关于SimpleExecutor、ReuseExecutor、BatchExecutor共性相关的功能实现,此外,BaseExecutor还与MyBatis的一级缓存和事务管理相关
- SimpleExecutor:简单执行器,每次都会创建一个新的Statement(默认是PreparedStatement)
- ReuseExecutor:重用执行器,相同的Sql语句会使用同一个Statement
- BatchExecutor:批处理执行器,批处理提交修改,必须执行flushStatement才会生效
- CachingExecutor:缓存执行器,与MyBatis二级缓存相关
1、BaseExecutor
通过前文第二小节的介绍,我们可以知道,在每次通过代理调用SqlSession时,获取到的SqlSession都是一个新的对象。SqlSession中的属性Executor,与SqlSession是一对一的关系,同样是线程不安全的。
BaseExecutor作为SimpleExecutor、ReuseExecutor、BatchExecutor的直接父类,其实现了子类中的一些共性功能,下面先来简单的分析下BaseExecutor的源码。BaseExecutor中代码较多,先挑比较简单和重要的代码讲解,目前不是很重要的代码均删除以减轻看代码时的压力。之后有必要时会详述,如果感兴趣也可以阅读源码。
public abstract class BaseExecutor implements Executor {
// 事务管理接口、定义了与JDBC执行相关的功能,如获取连接、提交、回滚、关闭连接等操作
protected Transaction transaction;
protected Executor wrapper;
/** 下方三个对象均是与缓存相关的属性, 作用域此次执行对象的生命周期范围 */
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
protected PerpetualCache localCache;
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;
/** queryStack为查询栈,与动态sql中的嵌套子查询相关 */
protected int queryStack;
private boolean closed;
protected BaseExecutor(Configuration configuration, Transaction transaction) {
...
}
@Override
public Transaction getTransaction() {
...
}
/**
* 关闭当前对象、关闭之前判断是否需要回滚
*/
@Override
public void close(boolean forceRollback) {
try {
try {
rollback(forceRollback);
} finally {
if (transaction != null) {
transaction.close();
}
}
} catch (SQLException e) {
// Ignore. There's nothing that can be done at this point.
} finally {
// 清空本地属性
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
// 省略与错误记录相关的代码, 详情请看ErrorContext类的实现,其使用LocalThread记录线程执行时的错误信息。同样逻辑在query方法中也有
if (closed) {
throw new ExecutorException("Executor was closed.");
}
clearLocalCache();
return doUpdate(ms, parameter);
}
@Override
public List<BatchResult> flushStatements() throws SQLException {
return flushStatements(false);
}
public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
// ...
return doFlushStatements(isRollBack);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
// 如果不是嵌套查询,并且配置了flushCache为true,清空一级缓存
clearLocalCache();
}
List<E> list;
try {
queryStack++;
// 如果resultHandler为空,尝试从一级缓存中获取数据
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
// 该方法实现与存储过程相关,开发中基本不会用到
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
// 一级缓存中没有对应数据,从数据库加载
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
// 延迟装载相关逻辑,后文会讲
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// 如果全局配置的一级缓存作用域为statement,清除本地缓存
clearLocalCache();
}
}
return list;
}
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
// 创建CacheKey逻辑,主要与mappedStatement的ID,参数,分页参数和sql有关
...
return cacheKey;
}
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return localCache.getObject(key) != null;
}
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
clearLocalCache();
flushStatements();
if (required) {
transaction.commit();
}
}
@Override
public void rollback(boolean required) throws SQLException {
if (!closed) {
try {
clearLocalCache();
flushStatements(true);
} finally {
if (required) {
transaction.rollback();
}
}
}
}
/**
* 清空本地缓存、在BaseExecutor中,update、query、commit、rollback方法中,均有调用回滚的逻辑,具体请看上述对应方法的实现
*/
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
/** 执行更新操作 */
protected abstract int doUpdate(MappedStatement ms, Object parameter)
throws SQLException;
/** 执行Statem,填充结果、与批量执行相关 */
protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
throws SQLException;
/** 执行查询操作 */
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException;
/** 关闭Statement对象 */
protected void closeStatement(Statement statement) {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
// ignore
}
}
}
/**
* Apply a transaction timeout. 设置事务超时
* @param statement a current statement
* @throws SQLException if a database access error occurs, this method is called on a closed <code>Statement</code>
* @since 3.4.0
* @see StatementUtil#applyTransactionTimeout(Statement, Integer, Integer)
*/
protected void applyTransactionTimeout(Statement statement) throws SQLException {
StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), transaction.getTimeout());
}
private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
//
}
/** 查询数据库 */
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
// 插入key和占位符,为一级缓存做准备
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
// 保存执行结果作为以及缓存
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
/** 获取连接 */
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
}
从代码中可以看出来,BaseExecutor主要为了子类提供了一些共性方法的实现,如事务相关的提交和回滚,一级缓存和获取数据库连接等。同时,定义了doQuery、doUpdate、doFlushStatement等抽象方法,由对应的子类去实现具体逻辑。以query方法为例,在queryFormDatabase中最终调用doQuery执行查询。除此之外,BaseExecutor中的方法实现逻辑都比较的简单清晰,建议仔细多看几遍。关于一级缓存、嵌套子查询等,后续会有文章单独的仔细介绍。这节将重点放在Executor的整体实现上,不必放大某些细节实现。
2、SimpleExecutor
顾名思义,SimpleExecutor为简单执行器,MyBatis默认配置的执行器,实现上,在很多实际开发中,如果没有配置指定执行器,我们使用的都是SimpleExecutor。
下面我们先来看下SimpleExecutor的代码。
public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
//update
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
// 通过MappedStatement对象来获取configuration,但是获取到的configuration对象与SimpleExecutor构造器传进来的configuration是同一个对象
Configuration configuration = ms.getConfiguration();
// 新建一个StatementHandler
// 这里看到ResultHandler传入的是null
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
// 准备语句
stmt = prepareStatement(handler, ms.getStatementLog());
// StatementHandler.update
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
//select
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
// 新建一个StatementHandler
// 这里看到ResultHandler传入了
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
// 准备语句
stmt = prepareStatement(handler, ms.getStatementLog());
// StatementHandler.query
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
// doFlushStatements只是给batch用的,所以这里返回空
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
// 调用StatementHandler.prepare
stmt = handler.prepare(connection);
// 调用StatementHandler.parameterize
handler.parameterize(stmt);
return stmt;
}
}
以查询为例,当调用query方法时,经过BaseExecutor的处理后,调用queryFormDataBase时,其内部又调用了doQuery方法。所以实际上,真正去执行查询的方法为对应执行器的duQuery实现。但是在SimpleExecutor的doQuery方法中,我们可以看到,在executor层其实只做了三件事。
- 通过当前绑定执行的MappedStatement去获取一个对应的StatementHandler对象
- 创建Connection连接并生成Statement对象,并进行参数处理,每次使用SimpleExecutor执行查询或更新时,都会创建一个Statement对象
- 交由对应的StatementHandler去执行Statement
3、ReuseExecutor
ReuseExecutor名为重用执行器。重用执行器的作用是在一次会话中,重复使用同一个Statement对象以提升性能。以查询为例,每次执行查询时,都会去Statement缓存中查找,如果已经存在相同SQL的Statement对象,就会重复使用同一个Statement对象。那么ReuseExecutor是如何实现的呢?让我们来看下ReuseExecutor的代码。
public class ReuseExecutor extends BaseExecutor {
//可重用的执行器内部用了一个map,用来缓存SQL语句对应的Statement,即key为SQL
private final Map<String, Statement> statementMap = new HashMap<String, Statement>();
public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Configuration configuration = ms.getConfiguration();
//和SimpleExecutor一样,新建一个StatementHandler
//这里看到ResultHandler传入的是null
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
//准备语句
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
// flush的时候清除缓存
for (Statement stmt : statementMap.values()) {
closeStatement(stmt);
}
statementMap.clear();
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
//得到绑定的SQL语句
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
//如果缓存中已经有了,直接得到Statement
if (hasStatementFor(sql)) {
stmt = getStatement(sql);
} else {
//如果缓存没有找到,则和SimpleExecutor处理完全一样,然后加入缓存
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection);
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
private boolean hasStatementFor(String sql) {
try {
return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
} catch (SQLException e) {
return false;
}
}
private Statement getStatement(String s) {
return statementMap.get(s);
}
private void putStatement(String sql, Statement stmt) {
statementMap.put(sql, stmt);
}
}
ReuseExecutor实现Statement对象的复用逻辑很简单,在类内部维护一个Map,使用对应的SQL作为key来缓存同个会话中的Statement对象,每次需要执行时,都先去本地的缓存Map中查找,如果存在则直接使用之前的Statement对象,如果不存在再去创建Statement对象。
那么在同一个会话中,如果StatementId不同,能否命中ReuseExecutor中的Statement缓存?答案是可以的,只要最终的SQL一致,就可以复用缓存中的Statement对象,但也仅限于同一会话内。
4、BatchExecutor
BatchExecutor名为批量处理器,用来批量执行SQL,但是注意,批量处理只对更新(增删改)语句有效,对查询无效。在同一个会话内,增删改SQL会一次性处理好SQL和参数然后发送给数据库,从而减少与数据库的交互次数,减少IO消耗,提升性能。
public class BatchExecutor extends BaseExecutor {
public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;
/** 批量执行的Statement对象 **/
private final List<Statement> statementList = new ArrayList<Statement>();
/** 批量执行的结果对象 **/
private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
/** 上一次处理的SQL语句 **/
private String currentSql;
/** 上一次处理的MappedStatement对象 **/
private MappedStatement currentStatement;
public BatchExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
final Configuration configuration = ms.getConfiguration();
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
final BoundSql boundSql = handler.getBoundSql();
// 获取SQL语句
final String sql = boundSql.getSql();
final Statement stmt;
// 判断当前要处理的sql语句是否等于上一次执行的sql,MappedStatement也是同理
// 只有这两个对象都满足时,才能复用上一次的Statement对象
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
int last = statementList.size() - 1;
// 即使满足上述的两个条件,也只能从statement缓存list中获取最后一个对象
// 由此也可知,想要实现复用,相同的sql还必须满足连贯顺序
stmt = statementList.get(last);
BatchResult batchResult = batchResultList.get(last);
// 将参数保存到当前BatchResult中,稍后会处理
batchResult.addParameterObject(parameterObject);
} else {
// 创建和保存新的Statement对象和BatchResult对象
// 并将当前sql和MappedStatement设置为该次执行的对应对象
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection);
currentSql = sql;
currentStatement = ms;
statementList.add(stmt);
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
handler.parameterize(stmt);
// 底层使用的时Statement接口来实现批量处理
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException {
Statement stmt = null;
try {
flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection);
handler.parameterize(stmt);
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<BatchResult>();
if (isRollback) {
return Collections.emptyList();
}
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
BatchResult batchResult = batchResultList.get(i);
try {
// 执行并记录批量执行的数量
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
// 以下逻辑与执行insert时为映射的pojo对象设置自增长ID相关
// 核心是使用JDBC3的Statement.getGeneratedKeys
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) {
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
} catch (BatchUpdateException e) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i + 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
}
可以看出BatchExecutor类的代码相对与前面的两个类的实现要稍微复杂一些,复杂的地方主要集中在如何实现批量处理SQL及参数与执行insert语句后的填充key值处理。
在以上代码中,值得注意的时,在执行BatchResult的时候,如果是相同的SQL语句,会进行合并使用同一个Statement对象,但是有一个前提是,相同的SQL语句必须是相同的MappedStatement,并且SQL必须满足连贯的先后顺序。
说到这里,相信不少人都会有疑问,那么ReuseExecutor和BatchExecutor的区别是什么呢?
-
ReuseExecutor是使用同一个Statement对象,多次设置参数,多次执行,多次处理结果。BatchExecutor是多次设置参数,一次执行、一次性把SQL及参数发送给数据库,并进行一次结果处理
-
ReuseExecutor对查询、更新均有效,BatchExecutor只作用于更新
-
前面代码中注释到,BatchExecutor执行时,使用的时Statement接口的批量处理实现的。这是JDBC规范提供的批量处理支持。下面以PreparedStatement的batch方法实现为例。
@Override public void batch(Statement statement) throws SQLException { PreparedStatement ps = (PreparedStatement) statement; ps.addBatch(); }
5、CachingExecutor
CachingExecutor是缓存执行器,用于实现MyBatis的二级缓存,直接继承与Executor。关于二级缓存到底是怎么实现的,我将在后续单独写一篇文章介绍。在这个章节中,我们先来看看CachingExecutor中做了哪些事情,以及它是如何实现的。
public class CachingExecutor implements Executor {
/** 内部持有一个Executor对象,通过装饰着模式来增强原有executor的功能 **/
private Executor delegate;
/** 事务缓存管理器 **/
private TransactionalCacheManager tcm = new TransactionalCacheManager();
public CachingExecutor(Executor delegate) {
this.delegate = delegate;
delegate.setExecutorWrapper(this);
}
@Override
public Transaction getTransaction() {
return delegate.getTransaction();
}
@Override
public void close(boolean forceRollback) {
try {
//issues #499, #524 and #573
if (forceRollback) {
tcm.rollback();
} else {
tcm.commit();
}
} finally {
delegate.close(forceRollback);
}
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
// 刷新缓存完再update
flushCacheIfRequired(ms);
return delegate.update(ms, parameterObject);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameterObject);
// query时传入一个cachekey参数
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
//被ResultLoader.selectList调用
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
// 默认情况下是没有开启缓存的(二级缓存).要开启二级缓存,你需要在你的 SQL 映射文件中添加一行: <cache/>
// 简单的说,就是先查CacheKey,查不到再委托给实际的执行器去查
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, parameterObject, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list);
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public List<BatchResult> flushStatements() throws SQLException {
return delegate.flushStatements();
}
@Override
public void commit(boolean required) throws SQLException {
delegate.commit(required);
tcm.commit();
}
@Override
public void rollback(boolean required) throws SQLException {
try {
delegate.rollback(required);
} finally {
if (required) {
tcm.rollback();
}
}
}
private void ensureNoOutParams(MappedStatement ms, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
throw new ExecutorException("Caching stored procedures with OUT params is not supported. Please configure useCache=false in " + ms.getId() + " statement.");
}
}
}
}
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
}
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return delegate.isCached(ms, key);
}
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
delegate.deferLoad(ms, resultObject, property, key, targetType);
}
@Override
public void clearLocalCache() {
delegate.clearLocalCache();
}
private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
// 如果配置了flushCache, 则清空缓存
tcm.clear(cache);
}
}
@Override
public void setExecutorWrapper(Executor executor) {
throw new UnsupportedOperationException("This method should not be called");
}
}
CachingExecutor的实现,在设计上通过装饰者模式,在内部持有一个Executor对象,来给Executor的方法实现包装上缓存实现逻辑。同时,CachingExecutor中还有一个事务缓存管理器来实现事务数据的隔离性。
4、总结
在看完以上Executor执行体系的设计与实现后,我们来捋一捋一次查询时MyBatis的Executor方法执行栈。
-
当查询时,通过SqlSession的selectList方法进行开始执行(不管是执行selectOne方法还是selectList方法,最终调用的方法都是selectList)。SqlSession再调用内部持有的Executor的query方法执行。
@Override public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { try { //根据statement id找到对应的MappedStatement MappedStatement ms = configuration.getMappedStatement(statement); //转而用执行器来查询结果,注意这里传入的ResultHandler是null return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); } catch (Exception e) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } }
-
在开启二级缓存的时候,SqlSession内部的Executor对象实际上时CachingExecutor对象,在执行完相应的缓存逻辑后,调用CachingExecutor内部Executor对象的query方法。
-
由于在query方法的实现只在BaseExecutor中,也就是说,当CachingExecutor调用query方法时,实际上是在调用BaseExecutor的query方法。在前文我们说过BaseExecutor最终执行数据库查询时,调用的是子类的doQuery方法实现。
-
所以在最终执行查询时,会进入SimpleExecutor的doQuery方法,去完成一次真正的数据库查询
最后我们通过一张图来对本章节的Executor执行体系做一个回顾。
5、原文地址
本文地址:https://blog.csdn.net/weixin_40516653/article/details/107449258
下一篇: 网络性能应用检测系统