欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mybatis的Executor介绍(一)

程序员文章站 2022-05-09 22:17:39
...

5       MybatisExecutor介绍(一)

 

目录

 

5       MybatisExecutor介绍(一)

5.1        SimpleExecutor

5.2        ReuseExecutor

5.3        BatchExecutor

5.4        Executor的选择

5.4.1         默认的Executor

 

       Mybatis中所有的Mapper语句的执行都是通过Executor进行的,ExecutorMybatis的一个核心接口,其定义如下。从其定义的接口方法我们可以看出,对应的增删改语句是通过Executor接口的update方法进行的,查询是通过query方法进行的。虽然Executor接口的实现类有BaseExecutorCachingExecutor,而BaseExecutor的子类又有SimpleExecutorReuseExecutorBatchExecutor,但BaseExecutor是一个抽象类,其只实现了一些公共的封装,而把真正的核心实现都通过方法抽象出来给子类实现,如doUpdate()doQuery()CachingExecutor只是在Executor的基础上加入了缓存的功能,底层还是通过Executor调用的,所以真正有作用的Executor只有SimpleExecutorReuseExecutorBatchExecutor。它们都是自己实现的Executor核心功能,没有借助任何其它的Executor实现,它们是实现不同也就注定了它们的功能也是不一样的。Executor是跟SqlSession绑定在一起的,每一个SqlSession都拥有一个新的Executor对象,由Configuration创建。

public interface Executor {

 

  ResultHandler NO_RESULT_HANDLER = null;

 

  int update(MappedStatement ms, Object parameter) throws SQLException;

 

  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;

 

  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;

 

  List<BatchResult> flushStatements() throws SQLException;

 

  void commit(booleanrequired) throws SQLException;

 

  void rollback(booleanrequired) throws SQLException;

 

  CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);

 

  boolean isCached(MappedStatement ms, CacheKey key);

 

  void clearLocalCache();

 

  void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);

 

  Transaction getTransaction();

 

  void close(booleanforceRollback);

 

  boolean isClosed();

 

  void setExecutorWrapper(Executor executor);

 

}

 

       下面是BaseExecutor的源码,我们可以看看它是怎么实现Executor接口的,是怎么开放接口给子类实现的。

public abstract class BaseExecutor implements Executor {

 

  private static final Log log = LogFactory.getLog(BaseExecutor.class);

 

  protected Transaction transaction;

  protected Executor wrapper;

 

  protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;

  protected PerpetualCache localCache;

  protected PerpetualCache localOutputParameterCache;

  protected Configuration configuration;

 

  protected int queryStack = 0;

  private boolean closed;

 

  protected BaseExecutor(Configuration configuration, Transaction transaction) {

    this.transaction = transaction;

    this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>();

    this.localCache = new PerpetualCache("LocalCache");

    this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");

    this.closed = false;

    this.configuration = configuration;

    this.wrapper = this;

  }

 

  @Override

  public Transaction getTransaction() {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    returntransaction;

  }

 

  @Override

  public void close(boolean forceRollback) {

    try {

      try {

        rollback(forceRollback);

      } finally {

        if (transaction != null) {

          transaction.close();

        }

      }

    } catch (SQLException e) {

      // Ignore.  There's nothing that can be done at this point.

      log.warn("Unexpected exception on closing transaction.  Cause: " + e);

    } finally {

      transaction = null;

      deferredLoads = null;

      localCache = null;

      localOutputParameterCache = null;

      closed = true;

    }

  }

 

  @Override

  public boolean isClosed() {

    return closed;

  }

 

  @Override

  public int update(MappedStatement ms, Object parameter) throws SQLException {

    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    clearLocalCache();

    return doUpdate(ms, parameter);

  }

 

  @Override

  public List<BatchResult> flushStatements() throws SQLException {

    return flushStatements(false);

  }

 

  public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    return doFlushStatements(isRollBack);

  }

 

  @Override

  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {

    BoundSql boundSql = ms.getBoundSql(parameter);

    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);

    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);

 }

 

  @SuppressWarnings("unchecked")

  @Override

  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {

    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    if (queryStack == 0 && ms.isFlushCacheRequired()) {

      clearLocalCache();

    }

    List<E> list;

    try {

      queryStack++;

      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;

      if (list != null) {

        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);

      } else {

        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);

      }

    } finally {

      queryStack--;

    }

    if (queryStack == 0) {

      for (DeferredLoad deferredLoad : deferredLoads) {

        deferredLoad.load();

      }

      // issue #601

      deferredLoads.clear();

      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {

        // issue #482

        clearLocalCache();

      }

    }

    return list;

  }

 

  @Override

  public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType);

    if (deferredLoad.canLoad()) {

      deferredLoad.load();

    } else {

      deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType));

    }

  }

 

  @Override

  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    CacheKey cacheKey = new CacheKey();

    cacheKey.update(ms.getId());

    cacheKey.update(Integer.valueOf(rowBounds.getOffset()));

    cacheKey.update(Integer.valueOf(rowBounds.getLimit()));

    cacheKey.update(boundSql.getSql());

    List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();

    TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();

    // mimic DefaultParameterHandler logic

    for (int i = 0; i < parameterMappings.size(); i++) {

      ParameterMapping parameterMapping = parameterMappings.get(i);

      if (parameterMapping.getMode() != ParameterMode.OUT) {

        Object value;

        String propertyName = parameterMapping.getProperty();

        if (boundSql.hasAdditionalParameter(propertyName)) {

          value = boundSql.getAdditionalParameter(propertyName);

        } else if (parameterObject == null) {

          value = null;

        } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {

          value = parameterObject;

        } else {

          MetaObject metaObject = configuration.newMetaObject(parameterObject);

          value = metaObject.getValue(propertyName);

        }

        cacheKey.update(value);

      }

    }

    if (configuration.getEnvironment() != null) {

      // issue #176

      cacheKey.update(configuration.getEnvironment().getId());

    }

    return cacheKey;

  }   

 

  @Override

  public boolean isCached(MappedStatement ms, CacheKey key) {

    return localCache.getObject(key) != null;

  }

 

  @Override

  public void commit(boolean required) throws SQLException {

    if (closed) {

      throw new ExecutorException("Cannot commit, transaction is already closed");

    }

    clearLocalCache();

    flushStatements();

    if (required) {

      transaction.commit();

    }

  }

 

  @Override

  public void rollback(boolean required) throws SQLException {

    if (!closed) {

      try {

        clearLocalCache();

        flushStatements(true);

      } finally {

        if (required) {

          transaction.rollback();

        }

      }

    }

  }

 

  @Override

  public void clearLocalCache() {

    if (!closed) {

      localCache.clear();

      localOutputParameterCache.clear();

    }

  }

 

  protected abstract int doUpdate(MappedStatement ms, Object parameter)

      throws SQLException;

 

  protected abstract List<BatchResult> doFlushStatements(boolean isRollback)

      throws SQLException;

 

  protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)

      throws SQLException;

 

  protected void closeStatement(Statement statement) {

    if (statement != null) {

      try {

        statement.close();

      } catch (SQLException e) {

        // ignore

      }

    }

  }

 

  private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {

    if (ms.getStatementType() == StatementType.CALLABLE) {

      final Object cachedParameter = localOutputParameterCache.getObject(key);

      if (cachedParameter != null && parameter != null) {

        final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);

        final MetaObject metaParameter = configuration.newMetaObject(parameter);

        for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {

          if (parameterMapping.getMode() != ParameterMode.IN) {

            final String parameterName = parameterMapping.getProperty();

            final Object cachedValue = metaCachedParameter.getValue(parameterName);

            metaParameter.setValue(parameterName, cachedValue);

          }

        }

      }

    }

  }

 

  private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {

    List<E> list;

    localCache.putObject(key, EXECUTION_PLACEHOLDER);

    try {

      list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);

    } finally {

      localCache.removeObject(key);

    }

    localCache.putObject(key, list);

    if (ms.getStatementType() == StatementType.CALLABLE) {

      localOutputParameterCache.putObject(key, parameter);

    }

    returnlist;

  }

 

  protected Connection getConnection(Log statementLog) throws SQLException {

    Connection connection = transaction.getConnection();

    if (statementLog.isDebugEnabled()) {

      return ConnectionLogger.newInstance(connection, statementLog, queryStack);

    } else {

      return connection;

    }

  }

 

  @Override

  public void setExecutorWrapper(Executor wrapper) {

    this.wrapper = wrapper;

  }

 

  private static class DeferredLoad {

 

    private final MetaObject resultObject;

    private final String property;

    private final Class<?> targetType;

    private final CacheKey key;

    private final PerpetualCache localCache;

    private final ObjectFactory objectFactory;

    private final ResultExtractor resultExtractor;

 

    // issue #781

    public DeferredLoad(MetaObject resultObject,

                        String property,

                        CacheKey key,

                        PerpetualCache localCache,

                        Configuration configuration,

                        Class<?> targetType) {

      this.resultObject = resultObject;

      this.property = property;

      this.key = key;

      this.localCache = localCache;

      this.objectFactory = configuration.getObjectFactory();

      this.resultExtractor = new ResultExtractor(configuration, objectFactory);

      this.targetType = targetType;

    }

 

    public boolean canLoad() {

      return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER;

    }

 

    public void load() {

      @SuppressWarnings( "unchecked" )

      // we suppose we get back a List

      List<Object> list = (List<Object>) localCache.getObject(key);

      Object value = resultExtractor.extractObjectFromList(list, targetType);

      resultObject.setValue(property, value);

    }

 

  }

 

}

 

5.1     SimpleExecutor

       SimpleExecutorMybatis执行Mapper语句时默认使用的Executor。它提供最基本的Mapper语句执行功能,没有过多的封装的。SimpleExecutor的源码如下。

public class SimpleExecutor extends BaseExecutor {

 

  public SimpleExecutor(Configuration configuration, Transaction transaction) {

    super(configuration, transaction);

  }

 

  @Override

  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {

    Statement stmt = null;

    try {

      Configuration configuration = ms.getConfiguration();

      StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);

      stmt = prepareStatement(handler, ms.getStatementLog());

      returnhandler.update(stmt);

    } finally {

      closeStatement(stmt);

    }

  }

 

  @Override

  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {

    Statement stmt = null;

    try {

      Configuration configuration = ms.getConfiguration();

      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);

      stmt = prepareStatement(handler, ms.getStatementLog());

      return handler.<E>query(stmt, resultHandler);

    } finally {

      closeStatement(stmt);

    }

  }

 

  @Override

  public List<BatchResult> doFlushStatements(booleanisRollback) throws SQLException {

    return Collections.emptyList();

  }

 

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {

    Statement stmt;

    Connection connection = getConnection(statementLog);

    stmt = handler.prepare(connection);

    handler.parameterize(stmt);

    returnstmt;

  }

 

}

 

5.2     ReuseExecutor

       ReuseExecutor,顾名思义,是可以重用的Executor。它重用的是Statement对象,它会在内部利用一个Map把创建的Statement都缓存起来,每次在执行一条SQL语句时,它都会去判断之前是否存在基于该SQL缓存的Statement对象,存在而且之前缓存的Statement对象对应的Connection还没有关闭的时候就继续用之前的Statement对象,否则将创建一个新的Statement对象,并将其缓存起来。因为每一个新的SqlSession都有一个新的Executor对象,所以我们缓存在ReuseExecutor上的Statement的作用域是同一个SqlSession。以下是ReuseExecutor的源码。

public class ReuseExecutor extends BaseExecutor {

 

  private final Map<String, Statement> statementMap = new HashMap<String, Statement>();

 

  public ReuseExecutor(Configuration configuration, Transaction transaction) {

    super(configuration, transaction);

  }

 

  @Override

  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {

    Configuration configuration = ms.getConfiguration();

    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);

    Statement stmt = prepareStatement(handler, ms.getStatementLog());

    return handler.update(stmt);

  }

 

  @Override

  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {

    Configuration configuration = ms.getConfiguration();

    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);

    Statement stmt = prepareStatement(handler, ms.getStatementLog());

    return handler.<E>query(stmt, resultHandler);

  }

 

  @Override

  public List<BatchResult> doFlushStatements(booleanisRollback) throws SQLException {

    for (Statement stmt : statementMap.values()) {

      closeStatement(stmt);

    }

    statementMap.clear();

    return Collections.emptyList();

  }

 

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {

    Statement stmt;

    BoundSql boundSql = handler.getBoundSql();

    String sql = boundSql.getSql();

    if (hasStatementFor(sql)) {

      stmt = getStatement(sql);

    } else {

      Connection connection = getConnection(statementLog);

      stmt = handler.prepare(connection);

      putStatement(sql, stmt);

    }

    handler.parameterize(stmt);

    return stmt;

  }

 

  private boolean hasStatementFor(String sql) {

    try {

      return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();

    } catch (SQLException e) {

      return false;

    }

  }

 

  private Statement getStatement(String s) {

    return statementMap.get(s);

  }

 

  private void putStatement(String sql, Statement stmt) {

    statementMap.put(sql, stmt);

  }

 

}

 

5.3     BatchExecutor

       BatchExecutor的设计主要是用于做批量更新操作的。其底层会调用StatementexecuteBatch()方法实现批量操作。以下是BatchExecutor的源码。

public class BatchExecutor extends BaseExecutor {

  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;

  private final List<Statement> statementList = new ArrayList<Statement>();
  private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
  private String currentSql;
  private MappedStatement currentStatement;

  public BatchExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
     handler.parameterize(stmt);//fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      handler.parameterize(stmt);    //fix Issues 322
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
  // handler.parameterize(stmt);
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
      throws SQLException {
    Statement stmt = null;
    try {
      flushStatements();
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      handler.parameterize(stmt);
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
      List<BatchResult> results = new ArrayList<BatchResult>();
      if (isRollback) {
        return Collections.emptyList();
      }
      for (int i = 0, n = statementList.size(); i < n; i++) {
        Statement stmt = statementList.get(i);
        BatchResult batchResult = batchResultList.get(i);
        try {
          batchResult.setUpdateCounts(stmt.executeBatch());
          MappedStatement ms = batchResult.getMappedStatement();
          List<Object> parameterObjects = batchResult.getParameterObjects();
          KeyGenerator keyGenerator = ms.getKeyGenerator();
          if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
            Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
            jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
          } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
            for (Object parameter : parameterObjects) {
              keyGenerator.processAfter(this, ms, stmt, parameter);
            }
          }
        } catch (BatchUpdateException e) {
          StringBuilder message = new StringBuilder();
          message.append(batchResult.getMappedStatement().getId())
              .append(" (batch index #")
              .append(i + 1)
              .append(")")
              .append(" failed.");
          if (i > 0) {
            message.append(" ")
                .append(i)
                .append(" prior sub executor(s) completed successfully, but will be rolled back.");
          }
          throw new BatchExecutorException(message.toString(), e, results, batchResult);
        }
        results.add(batchResult);
      }
      return results;
    } finally {
      for (Statement stmt : statementList) {
        closeStatement(stmt);
      }
      currentSql = null;
      statementList.clear();
      batchResultList.clear();
    }
  }

}
 

 

5.4     Executor的选择

       既然BaseExecutor下面有SimpleExecutorReuseExecutorBatchExecutorExecutor还有一个CachingExecutor的实现,那我们怎么选择使用哪个Executor呢?默认情况下Mybatis的全局配置cachingEnabled=”true”,这就意味着默认情况下我们就会使用一个CachingExecutor来包装我们真正使用的Executor,这个在后续介绍Mybatis的缓存的文章中会介绍。那我们真正使用的BaseExecutor是怎么确定的呢?是通过我们在创建SqlSession的时候确定的。SqlSession都是通过SqlSessionFactoryopenSession()创建的,SqlSessionFactory提供了一系列的openSession()方法。

public interface SqlSessionFactory {

 

  SqlSession openSession();

 

  SqlSession openSession(boolean autoCommit);

  SqlSession openSession(Connection connection);

  SqlSession openSession(TransactionIsolationLevel level);

 

  SqlSession openSession(ExecutorType execType);

  SqlSession openSession(ExecutorType execType, boolean autoCommit);

  SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);

  SqlSession openSession(ExecutorType execType, Connection connection);

 

  Configuration getConfiguration();

 

}

 

       从上面SqlSessionFactory提供的方法来看,它一共提供了两类创建SqlSession的方法,一类是没有指定ExecutorType的,一类是指定了ExecutorType的。很显然,指定了ExecutorType时将使用ExecutorType对应类型的ExecutorExecutorType是一个枚举类型,有SIMPLEREUSEBATCH三个对象。

 

5.4.1  默认的Executor

       而没有指定ExecutorType时将使用默认的ExecutorMybatis默认的ExecutorSimpleExecutor,我们可以通过Mybatis的全局配置defaultExecutorType来进行配置,其可选值也是SIMPLEREUSEBATCH

      <setting name="defaultExecutorType" value="SIMPLE"/>

 

       注意,当Mybatis整合Spring后,Spring扫描后生成的Mapper对象,底层使用的SqlSession都是用的默认的Executor。如果我们需要在程序中使用非默认的Executor时,我们可以在Springbean容器中声明SqlSessionFactoryBean,然后在需要指定Executor的类中注入SqlSessionFactory,通过SqlSessionFactory来创建指定ExecutorTypeSqlSession

 

 

参考文档

       Mybatis源码

 

(注:本文是基于Mybatis3.3.1所写,写于20161224日星期六)