Mybatis 缓存实现原理

接下来,本文将介绍 Mybatis 缓存的实现原理,具体分析以下几个问题java

  • 一级缓存和二级缓存具体存放在哪
  • 如何判断存在二级缓存
  • 二级缓存事务性如何实现

二级缓存的实现类是在何时建立的?

解析 Mapper 配置文件,或者解析标注有 CacheNamespace 注解 Mapper 的时候,均会调用下面的方法。
由 MapperBuilderAssistant 建立缓存
org.apache.ibatis.builder.MapperBuilderAssistant#useNewCacheweb

public Cache useNewCache(Class<? extends Cache> typeClass,
      Class<? extends Cache> evictionClass,
      Long flushInterval,
      Integer size,
      boolean readWrite,
      boolean blocking,
      Properties props) {
    Cache cache = new CacheBuilder(currentNamespace)
        .implementation(valueOrDefault(typeClass, PerpetualCache.class))
        .addDecorator(valueOrDefault(evictionClass, LruCache.class))
        .clearInterval(flushInterval)
        .size(size)
        .readWrite(readWrite)
        .blocking(blocking)
        .properties(props)
        .build();
    configuration.addCache(cache);
    currentCache = cache;
    return cache;
  }

能够看到 Mybatis 使用了装饰者模式,默认使用 PerpetualCache 做为缓存,默认使用 Lru 缓存策略。数据库

public Cache build() {
    setDefaultImplementations();
    Cache cache = newBaseCacheInstance(implementation, id);
    setCacheProperties(cache);
    // issue #352, do not apply decorators to custom caches
    if (PerpetualCache.class.equals(cache.getClass())) {
      for (Class<? extends Cache> decorator : decorators) {
        cache = newCacheDecoratorInstance(decorator, cache);
        setCacheProperties(cache);
      }
      cache = setStandardDecorators(cache);
    } else if (!LoggingCache.class.isAssignableFrom(cache.getClass())) {
      cache = new LoggingCache(cache);
    }
    return cache;
  }

步骤1:设置默认实现类(PerpetualCache+LruCache)
步骤2:按照 ID 建立 Cache(ID为 Mapper 接口的全类名)
步骤3:设置缓存的参数
步骤4:自定义的缓存不该用标准的装饰器(定时刷新、只读、同步、阻塞等须要本身实现),仅包装一层日志
步骤5:设置标准的装饰器apache

private Cache setStandardDecorators(Cache cache) {
    try {
      MetaObject metaCache = SystemMetaObject.forObject(cache);
      if (size != null && metaCache.hasSetter("size")) {
        metaCache.setValue("size", size);
      }
      if (clearInterval != null) {
        cache = new ScheduledCache(cache);
        ((ScheduledCache) cache).setClearInterval(clearInterval);
      }
      if (readWrite) {
        cache = new SerializedCache(cache);
      }
      cache = new LoggingCache(cache);
      // 在获取更新缓存时防止并发
      cache = new SynchronizedCache(cache);
      if (blocking) {
        // 当在缓存中找不到时,阻塞进程
        cache = new BlockingCache(cache);
      }
      return cache;
    } catch (Exception e) {
      throw new CacheException("Error building standard cache decorators. Cause: " + e, e);
    }
  }

调用 CachingExecutor 执行SQL语句

开启了全局缓存,建立 SqlSession 时就会建立一个 CachingExecutor。缓存

提一句:开启一个 SqlSession 能够配置默认的执行器。SIMPLE 就是普通的执行器;REUSE 执行器会重用预处理语句(PreparedStatement); BATCH 执行器不只重用语句还会执行批量更新。并发

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    executorType = executorType == null ? defaultExecutorType : executorType;
    executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
    Executor executor;
    if (ExecutorType.BATCH == executorType) {
      executor = new BatchExecutor(this, transaction);
    } else if (ExecutorType.REUSE == executorType) {
      executor = new ReuseExecutor(this, transaction);
    } else {
      executor = new SimpleExecutor(this, transaction);
    }
    if (cacheEnabled) {
      executor = new CachingExecutor(executor);
    }
    executor = (Executor) interceptorChain.pluginAll(executor);
    return executor;
  }

若是自定义了 executor 插件,就对 executor 进行代理。app

无论 executor 有多少层代理,最终执行查询 SQL 语句的时候,都会调用以下的方法
org.apache.ibatis.executor.CachingExecutor#query(…)ide

@Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    // 获取SQL语句
    BoundSql boundSql = ms.getBoundSql(parameterObject);
    // 根据SQL语句建立 CacheKey ,做为缓存的主键,以后按照这个主键去缓存中查找。
    CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

缓存的主键是如何建立的呢,看下面的方法。svg

@Override
  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    CacheKey cacheKey = new CacheKey();
    cacheKey.update(ms.getId());
    cacheKey.update(rowBounds.getOffset());
    cacheKey.update(rowBounds.getLimit());
    cacheKey.update(boundSql.getSql());
    List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
    TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
    // mimic DefaultParameterHandler logic
    for (ParameterMapping parameterMapping : parameterMappings) {
      if (parameterMapping.getMode() != ParameterMode.OUT) {
        Object value;
        String propertyName = parameterMapping.getProperty();
        if (boundSql.hasAdditionalParameter(propertyName)) {
          value = boundSql.getAdditionalParameter(propertyName);
        } else if (parameterObject == null) {
          value = null;
        } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
          value = parameterObject;
        } else {
          MetaObject metaObject = configuration.newMetaObject(parameterObject);
          value = metaObject.getValue(propertyName);
        }
        cacheKey.update(value);
      }
    }
    if (configuration.getEnvironment() != null) {
      // issue #176
      cacheKey.update(configuration.getEnvironment().getId());
    }
    return cacheKey;
  }

缓存的 KEY 的建立很简单,以以下信息拼接而成:SQL 方法名称+分页信息+具体的SQL+具体的输入参数+执行环境
打印 CacheKey.toString(),示例输出以下(以:隔开各个元素):ui

931220670:1409391160:org.apache.ibatis.autoconstructor.AutoConstructorMapper.getSubject:0:2147483647:SELECT * FROM subject WHERE id = ?:1:development

获取缓存

进行二级缓存判断

此方法是实现二级缓存的关键

@Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    Cache cache = ms.getCache();
    if (cache != null) {
      flushCacheIfRequired(ms);
      if (ms.isUseCache() && resultHandler == null) {
        ensureNoOutParams(ms, boundSql);
        @SuppressWarnings("unchecked")
        List<E> list = (List<E>) tcm.getObject(cache, key);
        if (list == null) {
          list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
          tcm.putObject(cache, key, list); // issue #578 and #116
        }
        return list;
      }
    }
    return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

步骤一:经过 MappedStatement 取二级缓存,由此可知二缓存默认存储在 MappedStatement 中。若是不存在,调用 delegate.query() 从一级缓存中取。
步骤二:判断是否须要刷新缓存,是的话须要清空缓存。可能有其余链接执行了修改或者删除,亦或标记了刷新缓存注解 @Options(flushCache = FlushCachePolicy.TRUE)
步骤三:判断是不是 CallableStatement ,不支持使用 OUT 参数缓存存储过程。
步骤四:从事务缓存管理中取缓存,此处以后具体分析。
步骤五:取到缓存直接返回,不执行 SQL,不存在调用 delegate.query() ,并缓存结果到缓存。

进行二级缓存读取

上面有一个很关键的方法 tcm.getObject(cache, key) ,两个参数,一个是缓存 cache,一个是 key,经过事务缓存管理以 key 从 cache 中读取数据。

public class TransactionalCacheManager {

  private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
 
  public Object getObject(Cache cache, CacheKey key) {
    return getTransactionalCache(cache).getObject(key);
  }
  
  private TransactionalCache getTransactionalCache(Cache cache) {
    return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
  }
  ......
}

经过 TransactionalCache 获取 key。
org.apache.ibatis.cache.decorators.TransactionalCache#getObject

@Override
  public Object getObject(Object key) {
    // issue #116
    Object object = delegate.getObject(key);
    if (object == null) {
      entriesMissedInCache.add(key);
    }
    // issue #146
    if (clearOnCommit) {
      return null;
    } else {
      return object;
    }
  }

每个二级缓存,均有一个 TransactionalCache(二级缓存的事务buffer) 对应。而后依次调用代理的缓存实现,直到最底层的缓存实现类 PerpetualCache。
在这里插入图片描述
以下图的调用栈:
在这里插入图片描述
因以前有链接查询过,这里会有缓存结果。

进行一级缓存读取

若是二级缓存获取不到数据,再进入 delegate.query 方法,调用 BaseExecutor 获取一级缓存

public abstract class BaseExecutor implements Executor {
	protected PerpetualCache localCache = new PerpetualCache("LocalCache");
	......
}
@Override
  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    if (queryStack == 0 && ms.isFlushCacheRequired()) {
      clearLocalCache();
    }
    List<E> list;
    try {
      queryStack++;
      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
      if (list != null) {
        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      } else {
        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
    } finally {
      queryStack--;
    }
    if (queryStack == 0) {
      for (DeferredLoad deferredLoad : deferredLoads) {
        deferredLoad.load();
      }
      // issue #601
      deferredLoads.clear();
      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
        // issue #482
        clearLocalCache();
      }
    }
    return list;
  }
  
  private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    List<E> list;
    localCache.putObject(key, EXECUTION_PLACEHOLDER);
    try {
      list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    } finally {
      localCache.removeObject(key);
    }
    localCache.putObject(key, list);
    if (ms.getStatementType() == StatementType.CALLABLE) {
      localOutputParameterCache.putObject(key, parameter);
    }
    return list;
  }

实现逻辑,你们本身看吧。

更新二级缓存

无论是修改,仍是删除,在调用 Executor 前都会先清空二级缓存(除非方法上标记不刷缓存)
org.apache.ibatis.executor.CachingExecutor#update

@Override
  public int update(MappedStatement ms, Object parameterObject) throws SQLException {
    flushCacheIfRequired(ms);
    return delegate.update(ms, parameterObject);
  }
  
  private void flushCacheIfRequired(MappedStatement ms) {
    Cache cache = ms.getCache();
    if (cache != null && ms.isFlushCacheRequired()) {
      tcm.clear(cache);
    }
  }

最终调用 TransactionalCache,标记事务结束的时候,须要清空真正的缓存,这里只是清空待提交的查询结果缓存。

@Override
  public void clear() {
    clearOnCommit = true;
    entriesToAddOnCommit.clear();
  }

调用 Executor 时,执行SQL前,再清空一级缓存
org.apache.ibatis.executor.BaseExecutor#update

@Override
  public int update(MappedStatement ms, Object parameter) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    clearLocalCache();
    return doUpdate(ms, parameter);
  }

事务结束后,判断是 commit 仍是 rollback。
若是是 commit ,先判断是否要清空待提交的查询结果缓存,再将待提交的查询结果缓存写入到真正的缓存 PerpetualCache 中。

public void commit() {
    if (clearOnCommit) {
      delegate.clear();
    }
    flushPendingEntries();
    reset();
  }
  
  private void flushPendingEntries() {
    for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
      delegate.putObject(entry.getKey(), entry.getValue());
    }
    for (Object entry : entriesMissedInCache) {
      if (!entriesToAddOnCommit.containsKey(entry)) {
      	// 无缓存提交,仍是要执行,保证唤醒其余线程
        delegate.putObject(entry, null);
      }
    }
  }

若是是 rollback ,将这个事务管理的多个 cache 所有清空。

public void rollback() {
    for (TransactionalCache txCache : transactionalCaches.values()) {
      txCache.rollback();
    }
  }
  
  public void rollback() {
    unlockMissedEntries();
    reset();
  }
  
  private void unlockMissedEntries() {
    for (Object entry : entriesMissedInCache) {
      try {
      	// 清空缓存,失效 key,其余线程可从新从数据库查询
        delegate.removeObject(entry);
      } catch (Exception e) {
        ...
      }
    }
  }
  
  private void reset() {
    clearOnCommit = false;
    entriesToAddOnCommit.clear();
    entriesMissedInCache.clear();
  }

思考几个问题

问题1

若是一个事务修改了表,以后又查询了表,事务正常commit以后,缓存还存在吗?

存在。修改会清空 entriesToAddOnCommit,而后标记须要清空真正的缓存;
接下来,查询会缓存结果到 entriesToAddOnCommit;
事务 commit 时会清空 delegate,以后又会将 entriesToAddOnCommit 写入到 delegate,最后清空 entriesToAddOnCommit,还原状态 clearOnCommit 。

问题2

你们可能看到了 TransactionalCache 中还有一个属性 entriesMissedInCache,它是干吗的呢?

它是为了支持 BlockingCache 用的,若是一个线程1调用查询获取不到数据,BlockingCache 会阻塞其余线程,直到线程1获取到结果放入到缓存,这样避免了当获取不到数据时其余线程所有从数据库读数据。
线程1获取到数据后,会释放 key 的重入锁,若是获取不到数据呢,其余线程依旧阻塞吗?不是的,若是当前线程1的事务结束了,是必定要释放 key 的重入锁的,这时须要唤醒其余线程。
看 org.apache.ibatis.cache.decorators.TransactionalCache#getObject 方法

@Override
  public Object getObject(Object key) {
    // issue #116
    Object object = delegate.getObject(key);
    if (object == null) {
      entriesMissedInCache.add(key);
    }
    // issue #146
    if (clearOnCommit) {
      return null;
    } else {
      return object;
    }
  }

获取不到记录,记录此查询的 key。方便事务提交的时候,释放此 key 的锁。
事务结束,执行 flushPendingEntries 或者 unlockMissedEntries 释放锁。

再思考一个问题

若是线程1正常查询,无修改操做,其余线程也无修改,但获取不到值,会执行 flushPendingEntries 方法中的 delegate.putObject(entry, null) 吗?这样唤醒其余线程,其余线程仍是会回到数据库执行查询,而后调用 delegate.putObject(entry, null) 吗?这样岂不是 BlockingCache 严重影响了查询效率?

不会,查询返回值不会为空,必定会返回一个 List 对象,保证缓存一个值。delegate.putObject(entry, null) 只是为了保证唤醒线程。

文章开头问题答案

最后回答开头提出的几个问题。

问题1:一级缓存和二级缓存具体存放在哪

  • 一级缓存存放在 BaseExecutor 中,每开启一个 SQLSession 均会建立一个 Executor,所以没法和其余 Session 互通;
  • 二级缓存具体存放在 MappedStatement 中,一个 SQL 语句对应一个 MappedStatement ,同一个命名空间(org.apache.ibatis.autoconstructor.AutoConstructorMapper)若不混用 java API 或者 XML,各个 MappedStatement 使用的是同一个缓存对象,只是 key 各不相同。

问题2:如何判断存在二级缓存

这个简单,经过查询语句,按照固定的格式拼接 key,拿这个 key 从 MappedStatement 获取缓存,获取获得即存在缓存。

问题3:二级缓存事务性如何实现

定义一个事务管理的缓存Buffer TransactionalCache,维护一个 Map<Object, Object> entriesToAddOnCommit 对象 和一个 clearOnCommit 清空缓存标识; entriesToAddOnCommit 中 key 为拼接的查询语句的那个 key,value 为事务结束前 key 的查询结果; 根据事务的结束状态,判断事务的查询缓存是否须要提交,以及已存在的二级缓存是否须要失效。