欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

深入理解MyBatis-Executor执行体系

程序员文章站 2022-03-07 11:29:26
深入理解MyBatis-Executor执行体系序言​MyBatis做为一个半ORM框架,现阶段在国内可谓如日中天,不论是前几年的SSM框架体系还是如今的微服务,企业级系统与数据库的交互基本使用的都是MyBatis。MyBatis作为一款轻量级框架,其架构设计与代码质量上,都十分的优秀。学习难度与成本均不算高,对于想学习源代码的同学来说,选择MyBatis作为入门十分适合。出于兴趣与爱好,同时也作为学习,提升自己。笔者将开始以深入理解MyBatis为主题的连载,经过一段时间的学习和规划,笔者将关于...

深入理解MyBatis-Executor执行体系


序言

​ MyBatis做为一个半ORM框架,现阶段在国内可谓如日中天,不论是前几年的SSM框架体系还是如今的微服务,企业级系统与数据库的交互基本使用的都是MyBatis。MyBatis作为一款轻量级框架,其架构设计与代码质量上,都十分的优秀。学习难度与成本均不算高,对于想学习源代码的同学来说,选择MyBatis作为入门十分适合。出于兴趣与爱好,同时也作为学习,提升自己。笔者将开始以深入理解MyBatis为主题的连载,经过一段时间的学习和规划,笔者将关于MyBatis的系列文章分为8个章节,作为对这段时间的学习与源码阅读的总结。若有感兴趣的同学看到这里,欢迎留言讨论。如果文章中有任何错误的地方,也感谢同学们帮忙指正。


概要

  • 回顾JDBC
  • MyBatis执行过程
  • Executor执行器体系

1、回顾JDBC

1、概述

​ JDBC(Java Data Base Connectivity)是一种用于执行SQL语句的Java API,从根本上来说JDBC是一种规范,为多种关系数据库提供统一的访问接口,允许便携式访问底层数据库。对于从业Java的同学来说,都不会对JDBC感到陌生。

2、执行流程

​ 以往不论是学习还是使用JDBC时,我们的代码基本都根据以下流程来完成一次与数据库的交互。

  1. 通过DriverManager获取数据库连接
  2. 预编译SQL语句,进行SQL参数填充
  3. 执行PrepareStatement并获取执行结果
  4. 将SQL的执行结果解析成我们需要的Bean对象

深入理解MyBatis-Executor执行体系

​ 下面的代码演示了使用JDBC执行一次简单的查询操作

/**
 * @author Emove
 * @date 2020/5/30
 */
public class JdbcTest {

    private final static String URL = "jdbc:mysql://localhost:3306/test";
    private final static String USER = "test";
    private final static String PWD = "123456";

    private Connection connection;

    @Before
    public void connect() throws SQLException {
        connection = DriverManager.getConnection(URL, USER, PWD);
    }

    @After
    public void close() throws SQLException {
        connection.close();
    }

    @Test
    public void run() throws SQLException {
        String sql = "SELECT * FROM users WHERE `name`=?";
        PreparedStatement psm = connection.prepareStatement(sql);
        psm.setString(1, "Emove");
        psm.execute();
        ResultSet resultSet = psm.getResultSet();
        while (resultSet.next()) {
            System.out.println(resultSet.getString(1));
            System.out.println(resultSet.getString(2));
        }
        resultSet.close();
        psm.close();
    }
}

​ 但是如果每次使用数据库,都需要使用这样的一段代码,自己获取一次数据库连接,拼接SQL、设置参数、处理结果集、再关闭连接。如果参数一多,整个流程下来,时间与人工成功都非常的高,也非常的繁琐,特别是在复杂SQL的情况下,同时使代码看起来十分的臃肿。MyBatis的出现,为的就是解决上述的问题,MyBatis通过封装上述的所有步骤,允许程序员自己编写SQL,配置好参数映射等,即可随时完成与数据库的交互,将程序员从繁琐的查询与处理SQL中解放出来。

3、JDBC中的三种执行器

​ 在Java的sql包中,有三个SQL执行器接口Statement(简单执行器)、PreparedStatement(预处理执行器)和CallableStatement(存储过程执行器)。

  1. Statement:执行静态SQL、批处理和设置加载行数
  2. PreparedStatement:设置预编译参数,用于实现防止SQL注入
  3. Callable Statement:与存储过程相关,可设置出参,读取出参
    深入理解MyBatis-Executor执行体系

2、MyBatis执行过程

深入理解MyBatis-Executor执行体系

​ 上图为MyBatis查询执行过程图,图中包含一次查询过程中,MyBatis的逻辑流程及分支。将以上流程进行划分,可大致分为以下三个部分:

  1. 从查询节点开始,到会话查询节点为止,为dao层与MyBatis交互逻辑,在此过程中,会获取一个SqlSession对象,该对象作用域为一次查询会话。
  2. 从会话查询节点开始,到查询数据库节点为止,为MyBatis的执行器(Executor)逻辑执行过程。在这段流程中,主要为MyBatis以执行的statementid(即执行的sql映射ID)为目标对象进行的缓存、事务管理。(本文主要就是讲解Executor的执行体系,至于具体的逻辑代码,下文会有源码分析)
  3. 从查询数据库节点开始,到剩余的其他分支。为第一小节中,对JDBC的执行流程的封装及处理。

下图是MyBatis查询的一次执行流程

深入理解MyBatis-Executor执行体系

SqlSession

​ SqlSession是MyBatis中的最重要的构建之一,是MyBatis对外提供的门面,用于提供增删改查、事务管理等功能,类似于JDBC里的Connection。它是应用层及MyBatis与DB之间执行交互操作的一个单线程对象,也是MyBatis执行持久化操作的关键对象。但由于SqlSession的实例不是线程安全的,因此是不能被共享的,所以它的最佳的作用域是一次请求或方法作用域。

​ 在MyBatis中,DefaultSqlSession继承了SqlSession接口,是SqlSession的默认实现,在该实现类中,包含了两个重要对象ConfigurationExecutor。Configuration也被称为MyBatis的大管家,贯穿了MyBatis的一次执行流程。而Executor则是后续查询逻辑操作的主要载体对象之一,下面将主要分析Executor的执行体系。

/**
 * The default implementation for {@link SqlSession}. 
 * Note that this class is not Thread-Safe.
 *
 * @author Clinton Begin
 */
public class DefaultSqlSession implements SqlSession {

  private final Configuration configuration;
  private final Executor executor;

  private final boolean autoCommit;
  private boolean dirty;
  private List<Cursor<?>> cursorList;

  public DefaultSqlSession(Configuration configuration, Executor executor, boolean autoCommit) {
    this.configuration = configuration;
    this.executor = executor;
    this.dirty = false;
    this.autoCommit = autoCommit;
  }

  public DefaultSqlSession(Configuration configuration, Executor executor) {
    this(configuration, executor, false);
  }
  
  ...
  
}

3、Executor执行器体系

​ 在前文的简单介绍中,我们已经知道,SqlSession对象只是一个门面,其后续的执行工作都交给了Executor。

​ SqlSession作为MyBatis的门面,用于提供MyBatis对应用提供的操作接口,其中除了基本的增删改查之外,还有事务相关的提交回滚,会话关闭,清除缓存,获取configuration和获取数据连接等辅助API,主要是方便我们对方法的调用。在SqlSession对外提供的这些接口中,大部分都是Executor在提供支持。下面我们来看下Executor接口定义的方法。

深入理解MyBatis-Executor执行体系

​ 可以看到,Executor中定义的方法大致都是与数据交互相关的方法,大致可以分为以下三类:

  1. 增删改查:增删改都包含在update方法中统一处理
  2. 事务相关:获取事务管理、提交回滚等
  3. 缓存相关:其中包含了MyBatis提供的一级、二级缓存等。

​ 接下来,我们简单看下Executor的继承关系:

深入理解MyBatis-Executor执行体系

​ MyBatis中,Executor提供了四个实现,其中ClosedExecutor为一个内部类,只实现了isClosed方法,忽略不计。先简单介绍下各个处理器的功能:

  1. BaseExecutor:基础执行器,是一个抽象类,提供了关于SimpleExecutor、ReuseExecutor、BatchExecutor共性相关的功能实现,此外,BaseExecutor还与MyBatis的一级缓存和事务管理相关
  2. SimpleExecutor:简单执行器,每次都会创建一个新的Statement(默认是PreparedStatement)
  3. ReuseExecutor:重用执行器,相同的Sql语句会使用同一个Statement
  4. BatchExecutor:批处理执行器,批处理提交修改,必须执行flushStatement才会生效
  5. CachingExecutor:缓存执行器,与MyBatis二级缓存相关

1、BaseExecutor

​ 通过前文第二小节的介绍,我们可以知道,在每次通过代理调用SqlSession时,获取到的SqlSession都是一个新的对象。SqlSession中的属性Executor,与SqlSession是一对一的关系,同样是线程不安全的。

​ BaseExecutor作为SimpleExecutor、ReuseExecutor、BatchExecutor的直接父类,其实现了子类中的一些共性功能,下面先来简单的分析下BaseExecutor的源码。BaseExecutor中代码较多,先挑比较简单和重要的代码讲解,目前不是很重要的代码均删除以减轻看代码时的压力。之后有必要时会详述,如果感兴趣也可以阅读源码。

public abstract class BaseExecutor implements Executor {
  // 事务管理接口、定义了与JDBC执行相关的功能,如获取连接、提交、回滚、关闭连接等操作
  protected Transaction transaction;
  protected Executor wrapper;

  /** 下方三个对象均是与缓存相关的属性, 作用域此次执行对象的生命周期范围 */
  protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
  protected PerpetualCache localCache;
  protected PerpetualCache localOutputParameterCache;
    
  protected Configuration configuration;

  /** queryStack为查询栈,与动态sql中的嵌套子查询相关 */
  protected int queryStack;
  private boolean closed;
    
  protected BaseExecutor(Configuration configuration, Transaction transaction) {
    ...
  }

  @Override
  public Transaction getTransaction() {
  	...
  }

  /**
   * 关闭当前对象、关闭之前判断是否需要回滚
   */
  @Override
  public void close(boolean forceRollback) {
    try {
      try {
        rollback(forceRollback);
      } finally {
        if (transaction != null) {
          transaction.close();
        }
      }
    } catch (SQLException e) {
      // Ignore.  There's nothing that can be done at this point.
    } finally {
      // 清空本地属性
    }
  }

  @Override
  public boolean isClosed() {
    return closed;
  }

  @Override
  public int update(MappedStatement ms, Object parameter) throws SQLException {
    // 省略与错误记录相关的代码, 详情请看ErrorContext类的实现,其使用LocalThread记录线程执行时的错误信息。同样逻辑在query方法中也有
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    clearLocalCache();
    return doUpdate(ms, parameter);
  }

  @Override
  public List<BatchResult> flushStatements() throws SQLException {
    return flushStatements(false);
  }

  public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
    // ...
    return doFlushStatements(isRollBack);
  }

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    BoundSql boundSql = ms.getBoundSql(parameter);
    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
  }

  @SuppressWarnings("unchecked")
  @Override
  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    if (queryStack == 0 && ms.isFlushCacheRequired()) {
      // 如果不是嵌套查询,并且配置了flushCache为true,清空一级缓存
      clearLocalCache();
    }
    List<E> list;
    try {
      queryStack++;
      // 如果resultHandler为空,尝试从一级缓存中获取数据
      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
      if (list != null) {
        // 该方法实现与存储过程相关,开发中基本不会用到
        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      } else {
        // 一级缓存中没有对应数据,从数据库加载
        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
    } finally {
      queryStack--;
    }
    if (queryStack == 0) {
      // 延迟装载相关逻辑,后文会讲
      for (DeferredLoad deferredLoad : deferredLoads) {
        deferredLoad.load();
      }
      deferredLoads.clear();
      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
        // 如果全局配置的一级缓存作用域为statement,清除本地缓存
        clearLocalCache();
      }
    }
    return list;
  }

  @Override
  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    // 创建CacheKey逻辑,主要与mappedStatement的ID,参数,分页参数和sql有关
    ...
    return cacheKey;
  }

  @Override
  public boolean isCached(MappedStatement ms, CacheKey key) {
    return localCache.getObject(key) != null;
  }

  @Override
  public void commit(boolean required) throws SQLException {
    if (closed) {
      throw new ExecutorException("Cannot commit, transaction is already closed");
    }
    clearLocalCache();
    flushStatements();
    if (required) {
      transaction.commit();
    }
  }

  @Override
  public void rollback(boolean required) throws SQLException {
    if (!closed) {
      try {
        clearLocalCache();
        flushStatements(true);
      } finally {
        if (required) {
          transaction.rollback();
        }
      }
    }
  }

  /**
   * 清空本地缓存、在BaseExecutor中,update、query、commit、rollback方法中,均有调用回滚的逻辑,具体请看上述对应方法的实现
   */
  @Override
  public void clearLocalCache() {
    if (!closed) {
      localCache.clear();
      localOutputParameterCache.clear();
    }
  }

  /** 执行更新操作 */  
  protected abstract int doUpdate(MappedStatement ms, Object parameter)
      throws SQLException;
  /** 执行Statem,填充结果、与批量执行相关 */  
  protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
      throws SQLException;
  /** 执行查询操作 */
  protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
      throws SQLException;
    
  /** 关闭Statement对象 */
  protected void closeStatement(Statement statement) {
    if (statement != null) {
      try {
        statement.close();
      } catch (SQLException e) {
        // ignore
      }
    }
  }

  /**
   * Apply a transaction timeout. 设置事务超时
   * @param statement a current statement
   * @throws SQLException if a database access error occurs, this method is called on a closed <code>Statement</code>
   * @since 3.4.0
   * @see StatementUtil#applyTransactionTimeout(Statement, Integer, Integer)
   */
  protected void applyTransactionTimeout(Statement statement) throws SQLException {
    StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), transaction.getTimeout());
  }

  private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
    // 
  }

   /** 查询数据库 */
  private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    List<E> list;
    // 插入key和占位符,为一级缓存做准备
    localCache.putObject(key, EXECUTION_PLACEHOLDER);
    try {
      list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    } finally {
      localCache.removeObject(key);
    }
    // 保存执行结果作为以及缓存
    localCache.putObject(key, list);
    if (ms.getStatementType() == StatementType.CALLABLE) {
      localOutputParameterCache.putObject(key, parameter);
    }
    return list;
  }
 
  /** 获取连接 */  
  protected Connection getConnection(Log statementLog) throws SQLException {
    Connection connection = transaction.getConnection();
    if (statementLog.isDebugEnabled()) {
      return ConnectionLogger.newInstance(connection, statementLog, queryStack);
    } else {
      return connection;
    }
  }
}

​ 从代码中可以看出来,BaseExecutor主要为了子类提供了一些共性方法的实现,如事务相关的提交和回滚,一级缓存和获取数据库连接等。同时,定义了doQuery、doUpdate、doFlushStatement等抽象方法,由对应的子类去实现具体逻辑。以query方法为例,在queryFormDatabase中最终调用doQuery执行查询。除此之外,BaseExecutor中的方法实现逻辑都比较的简单清晰,建议仔细多看几遍。关于一级缓存、嵌套子查询等,后续会有文章单独的仔细介绍。这节将重点放在Executor的整体实现上,不必放大某些细节实现。

2、SimpleExecutor

​ 顾名思义,SimpleExecutor为简单执行器,MyBatis默认配置的执行器,实现上,在很多实际开发中,如果没有配置指定执行器,我们使用的都是SimpleExecutor。

​ 下面我们先来看下SimpleExecutor的代码。

public class SimpleExecutor extends BaseExecutor {

  public SimpleExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  //update
  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Statement stmt = null;
    try {
      // 通过MappedStatement对象来获取configuration,但是获取到的configuration对象与SimpleExecutor构造器传进来的configuration是同一个对象
      Configuration configuration = ms.getConfiguration();
      // 新建一个StatementHandler
      // 这里看到ResultHandler传入的是null
      StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
      // 准备语句
      stmt = prepareStatement(handler, ms.getStatementLog());
      // StatementHandler.update
      return handler.update(stmt);
    } finally {
      closeStatement(stmt);
    }
  }

  //select
  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      // 新建一个StatementHandler
      // 这里看到ResultHandler传入了
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      // 准备语句
      stmt = prepareStatement(handler, ms.getStatementLog());
      // StatementHandler.query
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
	// doFlushStatements只是给batch用的,所以这里返回空
    return Collections.emptyList();
  }

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    Connection connection = getConnection(statementLog);
    // 调用StatementHandler.prepare
    stmt = handler.prepare(connection);
    // 调用StatementHandler.parameterize
    handler.parameterize(stmt);
    return stmt;
  }
}

​ 以查询为例,当调用query方法时,经过BaseExecutor的处理后,调用queryFormDataBase时,其内部又调用了doQuery方法。所以实际上,真正去执行查询的方法为对应执行器的duQuery实现。但是在SimpleExecutor的doQuery方法中,我们可以看到,在executor层其实只做了三件事。

  1. 通过当前绑定执行的MappedStatement去获取一个对应的StatementHandler对象
  2. 创建Connection连接并生成Statement对象,并进行参数处理,每次使用SimpleExecutor执行查询或更新时,都会创建一个Statement对象
  3. 交由对应的StatementHandler去执行Statement

3、ReuseExecutor

​ ReuseExecutor名为重用执行器。重用执行器的作用是在一次会话中,重复使用同一个Statement对象以提升性能。以查询为例,每次执行查询时,都会去Statement缓存中查找,如果已经存在相同SQL的Statement对象,就会重复使用同一个Statement对象。那么ReuseExecutor是如何实现的呢?让我们来看下ReuseExecutor的代码。

public class ReuseExecutor extends BaseExecutor {

  //可重用的执行器内部用了一个map,用来缓存SQL语句对应的Statement,即key为SQL
  private final Map<String, Statement> statementMap = new HashMap<String, Statement>();

  public ReuseExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    //和SimpleExecutor一样,新建一个StatementHandler
    //这里看到ResultHandler传入的是null
    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
    //准备语句
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.update(stmt);
  }

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.<E>query(stmt, resultHandler);
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    // flush的时候清除缓存
    for (Statement stmt : statementMap.values()) {
      closeStatement(stmt);
    }
    statementMap.clear();
    return Collections.emptyList();
  }

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    //得到绑定的SQL语句
    BoundSql boundSql = handler.getBoundSql();
    String sql = boundSql.getSql();
    //如果缓存中已经有了,直接得到Statement
    if (hasStatementFor(sql)) {
      stmt = getStatement(sql);
    } else {
      //如果缓存没有找到,则和SimpleExecutor处理完全一样,然后加入缓存
      Connection connection = getConnection(statementLog);
      stmt = handler.prepare(connection);
      putStatement(sql, stmt);
    }
    handler.parameterize(stmt);
    return stmt;
  }

  private boolean hasStatementFor(String sql) {
    try {
      return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
    } catch (SQLException e) {
      return false;
    }
  }

  private Statement getStatement(String s) {
    return statementMap.get(s);
  }

  private void putStatement(String sql, Statement stmt) {
    statementMap.put(sql, stmt);
  }

}

​ ReuseExecutor实现Statement对象的复用逻辑很简单,在类内部维护一个Map,使用对应的SQL作为key来缓存同个会话中的Statement对象,每次需要执行时,都先去本地的缓存Map中查找,如果存在则直接使用之前的Statement对象,如果不存在再去创建Statement对象。

​ 那么在同一个会话中,如果StatementId不同,能否命中ReuseExecutor中的Statement缓存?答案是可以的,只要最终的SQL一致,就可以复用缓存中的Statement对象,但也仅限于同一会话内。

4、BatchExecutor

​ BatchExecutor名为批量处理器,用来批量执行SQL,但是注意,批量处理只对更新(增删改)语句有效,对查询无效。在同一个会话内,增删改SQL会一次性处理好SQL和参数然后发送给数据库,从而减少与数据库的交互次数,减少IO消耗,提升性能。

public class BatchExecutor extends BaseExecutor {

  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;

  /** 批量执行的Statement对象 **/
  private final List<Statement> statementList = new ArrayList<Statement>();
  /** 批量执行的结果对象 **/
  private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
  /** 上一次处理的SQL语句 **/
  private String currentSql;
  /** 上一次处理的MappedStatement对象 **/
  private MappedStatement currentStatement;

  public BatchExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    // 获取SQL语句
    final String sql = boundSql.getSql();
    final Statement stmt;
    // 判断当前要处理的sql语句是否等于上一次执行的sql,MappedStatement也是同理
    // 只有这两个对象都满足时,才能复用上一次的Statement对象
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      // 即使满足上述的两个条件,也只能从statement缓存list中获取最后一个对象
      // 由此也可知,想要实现复用,相同的sql还必须满足连贯顺序
      stmt = statementList.get(last);
      BatchResult batchResult = batchResultList.get(last);
      // 将参数保存到当前BatchResult中,稍后会处理
      batchResult.addParameterObject(parameterObject);
    } else {
      // 创建和保存新的Statement对象和BatchResult对象
      // 并将当前sql和MappedStatement设置为该次执行的对应对象
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    handler.parameterize(stmt);
    // 底层使用的时Statement接口来实现批量处理
    handler.batch(stmt); 
    return BATCH_UPDATE_RETURN_VALUE;
  }

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
      throws SQLException {
    Statement stmt = null;
    try {
      flushStatements();
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      handler.parameterize(stmt);
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
      List<BatchResult> results = new ArrayList<BatchResult>();
      if (isRollback) {
        return Collections.emptyList();
      }
      for (int i = 0, n = statementList.size(); i < n; i++) {
        Statement stmt = statementList.get(i);
        BatchResult batchResult = batchResultList.get(i);
        try {
          // 执行并记录批量执行的数量
          batchResult.setUpdateCounts(stmt.executeBatch());
          MappedStatement ms = batchResult.getMappedStatement();
          List<Object> parameterObjects = batchResult.getParameterObjects();
          KeyGenerator keyGenerator = ms.getKeyGenerator();
          if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
            // 以下逻辑与执行insert时为映射的pojo对象设置自增长ID相关
            // 核心是使用JDBC3的Statement.getGeneratedKeys
            Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
            jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
          } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { 
            for (Object parameter : parameterObjects) {
              keyGenerator.processAfter(this, ms, stmt, parameter);
            }
          }
        } catch (BatchUpdateException e) {
          StringBuilder message = new StringBuilder();
          message.append(batchResult.getMappedStatement().getId())
              .append(" (batch index #")
              .append(i + 1)
              .append(")")
              .append(" failed.");
          if (i > 0) {
            message.append(" ")
                .append(i)
                .append(" prior sub executor(s) completed successfully, but will be rolled back.");
          }
          throw new BatchExecutorException(message.toString(), e, results, batchResult);
        }
        results.add(batchResult);
      }
      return results;
    } finally {
      for (Statement stmt : statementList) {
        closeStatement(stmt);
      }
      currentSql = null;
      statementList.clear();
      batchResultList.clear();
    }
  }
}

​ 可以看出BatchExecutor类的代码相对与前面的两个类的实现要稍微复杂一些,复杂的地方主要集中在如何实现批量处理SQL及参数与执行insert语句后的填充key值处理。

​ 在以上代码中,值得注意的时,在执行BatchResult的时候,如果是相同的SQL语句,会进行合并使用同一个Statement对象,但是有一个前提是,相同的SQL语句必须是相同的MappedStatement,并且SQL必须满足连贯的先后顺序。

​ 说到这里,相信不少人都会有疑问,那么ReuseExecutor和BatchExecutor的区别是什么呢?

  1. ReuseExecutor是使用同一个Statement对象,多次设置参数,多次执行,多次处理结果。BatchExecutor是多次设置参数,一次执行、一次性把SQL及参数发送给数据库,并进行一次结果处理

  2. ReuseExecutor对查询、更新均有效,BatchExecutor只作用于更新

  3. 前面代码中注释到,BatchExecutor执行时,使用的时Statement接口的批量处理实现的。这是JDBC规范提供的批量处理支持。下面以PreparedStatement的batch方法实现为例。

    @Override
      public void batch(Statement statement) throws SQLException {
        PreparedStatement ps = (PreparedStatement) statement;
        ps.addBatch();
      }
    

5、CachingExecutor

​ CachingExecutor是缓存执行器,用于实现MyBatis的二级缓存,直接继承与Executor。关于二级缓存到底是怎么实现的,我将在后续单独写一篇文章介绍。在这个章节中,我们先来看看CachingExecutor中做了哪些事情,以及它是如何实现的。

public class CachingExecutor implements Executor {

  /** 内部持有一个Executor对象,通过装饰着模式来增强原有executor的功能 **/
  private Executor delegate;
  /** 事务缓存管理器 **/
  private TransactionalCacheManager tcm = new TransactionalCacheManager();

  public CachingExecutor(Executor delegate) {
    this.delegate = delegate;
    delegate.setExecutorWrapper(this);
  }

  @Override
  public Transaction getTransaction() {
    return delegate.getTransaction();
  }

  @Override
  public void close(boolean forceRollback) {
    try {
      //issues #499, #524 and #573
      if (forceRollback) { 
        tcm.rollback();
      } else {
        tcm.commit();
      }
    } finally {
      delegate.close(forceRollback);
    }
  }

  @Override
  public boolean isClosed() {
    return delegate.isClosed();
  }

  @Override
  public int update(MappedStatement ms, Object parameterObject) throws SQLException {
	// 刷新缓存完再update
    flushCacheIfRequired(ms);
    return delegate.update(ms, parameterObject);
  }

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    BoundSql boundSql = ms.getBoundSql(parameterObject);
	// query时传入一个cachekey参数
    CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

  //被ResultLoader.selectList调用
  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    Cache cache = ms.getCache();
    // 默认情况下是没有开启缓存的(二级缓存).要开启二级缓存,你需要在你的 SQL 映射文件中添加一行: <cache/>
    // 简单的说,就是先查CacheKey,查不到再委托给实际的执行器去查
    if (cache != null) {
      flushCacheIfRequired(ms);
      if (ms.isUseCache() && resultHandler == null) {
        ensureNoOutParams(ms, parameterObject, boundSql);
        @SuppressWarnings("unchecked")
        List<E> list = (List<E>) tcm.getObject(cache, key);
        if (list == null) {
          list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
          tcm.putObject(cache, key, list); 
        }
        return list;
      }
    }
    return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

  @Override
  public List<BatchResult> flushStatements() throws SQLException {
    return delegate.flushStatements();
  }

  @Override
  public void commit(boolean required) throws SQLException {
    delegate.commit(required);
    tcm.commit();
  }

  @Override
  public void rollback(boolean required) throws SQLException {
    try {
      delegate.rollback(required);
    } finally {
      if (required) {
        tcm.rollback();
      }
    }
  }

  private void ensureNoOutParams(MappedStatement ms, Object parameter, BoundSql boundSql) {
    if (ms.getStatementType() == StatementType.CALLABLE) {
      for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
        if (parameterMapping.getMode() != ParameterMode.IN) {
          throw new ExecutorException("Caching stored procedures with OUT params is not supported.  Please configure useCache=false in " + ms.getId() + " statement.");
        }
      }
    }
  }

  @Override
  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
  }

  @Override
  public boolean isCached(MappedStatement ms, CacheKey key) {
    return delegate.isCached(ms, key);
  }

  @Override
  public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
    delegate.deferLoad(ms, resultObject, property, key, targetType);
  }

  @Override
  public void clearLocalCache() {
    delegate.clearLocalCache();
  }

  private void flushCacheIfRequired(MappedStatement ms) {
    Cache cache = ms.getCache();
    if (cache != null && ms.isFlushCacheRequired()) {
      // 如果配置了flushCache, 则清空缓存
      tcm.clear(cache);
    }
  }

  @Override
  public void setExecutorWrapper(Executor executor) {
    throw new UnsupportedOperationException("This method should not be called");
  }

}

​ CachingExecutor的实现,在设计上通过装饰者模式,在内部持有一个Executor对象,来给Executor的方法实现包装上缓存实现逻辑。同时,CachingExecutor中还有一个事务缓存管理器来实现事务数据的隔离性。

4、总结

​ 在看完以上Executor执行体系的设计与实现后,我们来捋一捋一次查询时MyBatis的Executor方法执行栈。

  1. 当查询时,通过SqlSession的selectList方法进行开始执行(不管是执行selectOne方法还是selectList方法,最终调用的方法都是selectList)。SqlSession再调用内部持有的Executor的query方法执行。

    @Override
    public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
      try {
        //根据statement id找到对应的MappedStatement
        MappedStatement ms = configuration.getMappedStatement(statement);
        //转而用执行器来查询结果,注意这里传入的ResultHandler是null
        return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
      } catch (Exception e) {
        throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
      } finally {
        ErrorContext.instance().reset();
      }
    }
    
  2. 在开启二级缓存的时候,SqlSession内部的Executor对象实际上时CachingExecutor对象,在执行完相应的缓存逻辑后,调用CachingExecutor内部Executor对象的query方法。

  3. 由于在query方法的实现只在BaseExecutor中,也就是说,当CachingExecutor调用query方法时,实际上是在调用BaseExecutor的query方法。在前文我们说过BaseExecutor最终执行数据库查询时,调用的是子类的doQuery方法实现。

  4. 所以在最终执行查询时,会进入SimpleExecutor的doQuery方法,去完成一次真正的数据库查询

​ 最后我们通过一张图来对本章节的Executor执行体系做一个回顾。

深入理解MyBatis-Executor执行体系

5、原文地址

深入理解MyBatis-Executor执行体系

本文地址:https://blog.csdn.net/weixin_40516653/article/details/107449258