spring boot 之 多级缓存实现
一、为什么使用多级缓存
缓存的使用是解决高并发问题的一个重要途径,所以缓存很重要。一般情况下使用本地缓存,如ehcache,guava等就可以了,但是针对分布式、集群架构,本地缓存无法做到相互之间数据保持一致,如果使用redis缓存,则需要不断的去连接redis,这个中间也是有一定的资源消耗,在并发较小的时候这些消耗不影响系统,但是并发较大时就可能导致无法获取到redis连接,从而导致系统奔溃等等问题。
二、搭建自己的二级缓存
要想搭建自己的二级缓存平台就需要了解spring boot的cache实现机制,cache的实现核心就是org.springframework.cache.Cache接口和org.springframework.cache.CacheManager接口,前者是缓存类,后者是管理缓存的,所以只要我们重写了这两个类,那么就可以实现缓存的重写。
1.重写cache
cache提供了一下方法
String getName();
Object getNativeCache();
Cache.ValueWrapper get(Object var1);
<T> T get(Object var1, Class<T> var2);
<T> T get(Object var1, Callable<T> var2);
void put(Object var1, Object var2);
Cache.ValueWrapper putIfAbsent(Object var1, Object var2);
void evict(Object var1);
void clear();
这里我们只需要重写这些方法就行了,由于我们要实现二级缓存,所以要引入本地缓存和redis 缓存,也就是我们只需要在自定义的cache中实现本地缓存和redis 缓存同时使用就可以了。具体实现如下:
package com.wangcongming.cache.core.layering;
import com.alibaba.fastjson.JSON;
import com.wangcongming.cache.core.redis.cache.CustomizedRedisCache;
import com.wangcongming.cache.core.redis.listener.ChannelTopicEnum;
import com.wangcongming.cache.core.redis.listener.RedisPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.caffeine.CaffeineCache;
import org.springframework.cache.support.AbstractValueAdaptingCache;
import org.springframework.cache.support.NullValue;
import org.springframework.data.redis.core.RedisOperations;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* @author wangcongming
* @Package com.wangcongming.cache.core.layering
* @Description: 双缓存类 代码来源于网络,并在此基础做了修改
* @date 2018/5/18 15:16
*/
public class LayeringCache extends AbstractValueAdaptingCache {
Logger logger = LoggerFactory.getLogger(LayeringCache.class);
/**
* 缓存的名称
*/
private final String name;
/**
* 是否使用一级缓存
*/
private boolean usedFirstCache = true;
/**
* redi缓存
*/
private final CustomizedRedisCache redisCache;
/**
* Caffeine缓存
*/
private final CaffeineCache caffeineCache;
RedisOperations<? extends Object, ? extends Object> redisOperations;
/**
* @param name 缓存名称
* @param prefix 缓存前缀
* @param redisOperations 操作Redis的RedisTemplate
* @param expiration redis缓存过期时间
* @param allowNullValues 是否允许存NULL,默认是false
* @param usedFirstCache 是否使用一级缓存,默认是true
* @param caffeineCache Caffeine缓存
*/
public LayeringCache(String name, byte[] prefix, RedisOperations<? extends Object, ? extends Object> redisOperations,
long expiration, boolean allowNullValues, boolean usedFirstCache,
com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache) {
super(allowNullValues);
this.name = name;
this.usedFirstCache = usedFirstCache;
this.redisOperations = redisOperations;
this.redisCache = new CustomizedRedisCache(name, prefix, redisOperations, expiration, allowNullValues);
this.caffeineCache = new CaffeineCache(name, caffeineCache, allowNullValues);
}
@Override
public String getName() {
return this.name;
}
@Override
public Object getNativeCache() {
return this;
}
public CustomizedRedisCache getSecondaryCache() {
return this.redisCache;
}
public CaffeineCache getFirstCache() {
return this.caffeineCache;
}
@Override
public ValueWrapper get(Object key) {
ValueWrapper wrapper = null;
if (usedFirstCache) {
// 查询一级缓存
wrapper = caffeineCache.get(key);
logger.debug("查询一级缓存 key:{},返回值是:{}", key, JSON.toJSONString(wrapper));
}
if (wrapper == null) {
// 查询二级缓存
wrapper = redisCache.get(key);
// caffeineCache.put(key, wrapper == null ? null : JSON.toJSONString(wrapper));
logger.debug("查询二级缓存,并将数据放到一级缓存。 key:{},返回值是:{}", key, JSON.toJSONString(wrapper));
}
return wrapper;
}
@Override
public <T> T get(Object key, Class<T> type) {
T value = null;
if (usedFirstCache) {
// 查询一级缓存
value = caffeineCache.get(key, type);
logger.debug("查询一级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
}
if (value == null) {
// 查询二级缓存
value = redisCache.get(key, type);
caffeineCache.put(key, value);
logger.debug("查询二级缓存,并将数据放到一级缓存。 key:{},返回值是:{}", key, JSON.toJSONString(value));
}
return value;
}
@SuppressWarnings("unchecked")
@Override
public <T> T get(Object key, Callable<T> valueLoader) {
T value = null;
if (usedFirstCache) {
// Object o = caffeineCache.get(key).get();
// 查询一级缓存,如果一级缓存没有值则调用getForSecondaryCache(k, valueLoader)查询二级缓存
value = (T) caffeineCache.getNativeCache().get(key, k -> getForSecondaryCache(k, valueLoader));
// this.put(key,value);
} else {
// 直接查询二级缓存
value = (T) getForSecondaryCache(key, valueLoader);
}
if (value instanceof NullValue) {
return null;
}
return value;
}
@Override
public void put(Object key, Object value) {
if (usedFirstCache) {
caffeineCache.put(key, value);
}
redisCache.put(key, value);
}
@Override
public ValueWrapper putIfAbsent(Object key, Object value) {
if (usedFirstCache) {
caffeineCache.putIfAbsent(key, value);
}
return redisCache.putIfAbsent(key, value);
}
@Override
public void evict(Object key) {
// 删除的时候要先删除二级缓存再删除一级缓存,否则有并发问题
redisCache.evict(key);
if (usedFirstCache) {
// 删除一级缓存需要用到redis的Pub/Sub(订阅/发布)模式,否则集群中其他服服务器节点的一级缓存数据无法删除
Map<String, Object> message = new HashMap<>();
message.put("cacheName", name);
message.put("key", key);
// 创建redis发布者
RedisPublisher redisPublisher = new RedisPublisher(redisOperations, ChannelTopicEnum.REDIS_CACHE_DELETE_TOPIC.getChannelTopic());
// 发布消息
redisPublisher.publisher(message);
}
}
@Override
public void clear() {
redisCache.clear();
if (usedFirstCache) {
// 清除一级缓存需要用到redis的订阅/发布模式,否则集群中其他服服务器节点的一级缓存数据无法删除
Map<String, Object> message = new HashMap<>();
message.put("cacheName", name);
// 创建redis发布者
RedisPublisher redisPublisher = new RedisPublisher(redisOperations, ChannelTopicEnum.REDIS_CACHE_CLEAR_TOPIC.getChannelTopic());
// 发布消息
redisPublisher.publisher(message);
}
}
@Override
protected Object lookup(Object key) {
Object value = null;
if (usedFirstCache) {
value = caffeineCache.get(key);
logger.debug("查询一级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
}
if (value == null) {
value = redisCache.get(key);
logger.debug("查询二级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
}
return value;
}
/**
* 查询二级缓存
*
* @param key
* @param valueLoader
* @return
*/
private <T> Object getForSecondaryCache(Object key, Callable<T> valueLoader) {
T value = redisCache.get(key, valueLoader);
logger.debug("查询二级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
return toStoreValue(value);
}
}
从上述代码我们可以看到,在LayeringCache的构造方法中我们直接创建了CaffeineCache和redisCache(这里的rediscache进行了重写),然后在get方法中,先去取本地缓存,没有才会去查询redisCache
现在缓存已经有了,那么怎么去实现缓存的创建呢,这里就要使用cachemanager来实现缓存的管理。
2.重写CacheManager
cachemanager是来管理缓存的,他负责cache的创建
由于我们重写了cache,那么原有的cachemanager就无法再创建此cache,所以需要重写。
首先我们来看cachemanager的具体方法有哪些
package org.springframework.cache;
import java.util.Collection;
public interface CacheManager {
Cache getCache(String var1);
Collection<String> getCacheNames();
}
我们可以看到cachemanager接口只提供了两个方法,一个是获取cache,一个是获取所有cache name。
也就是我们只需要重写这两个方法就可以了
具体实现如下:
private final ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<String, Cache>(16);
private Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
.expireAfterAccess(DEFAULT_EXPIRE_AFTER_WRITE, TimeUnit.HOURS)
.initialCapacity(DEFAULT_INITIAL_CAPACITY)
.maximumSize(DEFAULT_MAXIMUM_SIZE);
public LayeringCacheManager(RedisOperations redisOperations) {
this(redisOperations, Collections.<String>emptyList());
}
public LayeringCacheManager(RedisOperations redisOperations, Collection<String> cacheNames) {
this(redisOperations, cacheNames, false);
}
public LayeringCacheManager(RedisOperations redisOperations, Collection<String> cacheNames, boolean allowNullValues) {
this.allowNullValues = allowNullValues;
this.redisOperations = redisOperations;
setCacheNames(cacheNames);
}
@Override
public Cache getCache(String name) {
Cache cache = this.cacheMap.get(name);
if (cache == null && this.dynamic) {
synchronized (this.cacheMap) {
cache = this.cacheMap.get(name);
if (cache == null) {
cache = createCache(name);
this.cacheMap.put(name, cache);
}
}
}
return cache;
}
protected Cache createCache(String name) {
return new LayeringCache(name, (usePrefix ? cachePrefix.prefix(name) : null), redisOperations,
getSecondaryCacheExpirationSecondTime(name), isAllowNullValues(), getUsedFirstCache(name),
createNativeCaffeineCache(name));
}
@Override
public Collection<String> getCacheNames() {
return Collections.unmodifiableSet(this.cacheMap.keySet());
}
如此我们就实现了二级缓存注:部分代码来源于网络
推荐阅读
-
spring-boot整合ehcache实现缓存机制的方法
-
使用 Spring Boot 2.0 + WebFlux 实现 RESTful API功能
-
spring boot2.0实现优雅停机的方法
-
spring boot 自定义starter的实现教程
-
详解Spring Boot实战之Rest接口开发及数据库基本操作
-
详解在spring boot中消息推送系统设计与实现
-
spring boot整合spring-kafka实现发送接收消息实例代码
-
spring boot实现上传图片并在页面上显示及遇到的问题小结
-
详解Spring Boot实战之Filter实现使用JWT进行接口认证
-
详解Spring Boot实战之单元测试