基于SpringBoot Redis Cache,封装一个能够批量操作(查询&保存)的缓存
之前的文章:
Spring Cache的使用教程:注解形式和api接口形式
Guava 本地缓存使用教程
在这前面两篇文章中,分别介绍了Spring Cache 和 Guava Cache 的使用,然后一对比就可以发现,Guava Cache 提供了批量查询的接口,而Spring Cache 只有单个查询的接口。
那当我们在使用Spring Cache的时候,需要用到批量查询缓存的时候应该怎么办呢?这个时候就有小伙伴觉得,直接写一个 for 循环,一个一个查不就行了。但这样的话性能就大打折扣了,虽然一般都会用到 Redis 连接池,每次查询不需要建立新的tcp连接(tcp连接的过程不知道大家还记不记得,(传送门网络基础知识) (为什么我要说这个呢?因为我之前面试就在这踩过坑) 但还是要进行多次数据传输的,仍然需要耗费大量的网络传输的时间。
因此我们需要自己封装一个能够批量操作的缓存。了解Redis的小伙伴都知道,Redis有一个pipeline 批量执行操作的功能,我们这里就主要通过 pipeline 实现,当然大家也可以尝试使用 mget 或 mset 命令。
注意:该篇文章主要以代码展示,部分细节可以看注释。且代码已经过测试
准备工作
实现思路
对Spring Cache 封装主要目的有两个,
- 对 cache 的操作进行try-cache,避免因缓存不可用导致服务不可用
- 添加对缓存的批量操作
因为批量查询时会将不存在的数据批量存入缓存中,所以只需要理清如何实现批量查询的逻辑就可以了
批量查询实现思路:首先是根据需要查询的 keys 集合,使用
RedisTemplate.executePipelined
方法将所有的缓存数据查询出来,存入一个List对象中。然后对list进行遍历,取出非null的json对象(存在缓存数据),将其转成目标对象(缓存数据类型),而对于为null的部分,记录对应的key 放在 nkeys 中,然后使用传入的加载数据的接口,加载这些缓存中不存在的数据。最后,将这些数据转化成map类型,再根据key-value使用pipeline批量存入redis缓存中。
简单一点就是:先批量查缓存 -> 找到缓存中不存的的那些key -> 加载对应的数据 -> 转成map批量保存到缓存中
添加配置
- 添加依赖
<!--spring cache-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<!--fastjson 用于序列化-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<!--redis所需的连接池-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version>
</dependency>
- 配置 Redis 服务器
spring:
redis:
host: 127.0.0.1
port: 6379
timeout: 5000
lettuce:
pool:
min-idle: 1
代码展示
封装 RedisMultiCache.java
/**
* 对 spring cache (redis实现) 进行了一层封装,主要包括以下两个
* <ul>
* <li>对默认的Cache方法进行的一层封装,主要进行了try-cache处理,因为不希望因为读取缓存失败导致整个流程报错</li>
* <li>添加了批量查询和存入缓存的操作</li>
* </ul>
*/
public class RedisMultiCache implements Cache {
private static final Logger log = LoggerFactory.getLogger(RedisMultiCache.class);
/**
* spring redis cache,
*/
private Cache cache;
private RedisTemplate redisTemplate;
/**
* 默认不清除原有缓存
*/
public RedisMultiCache(Cache cache, RedisTemplate redisTemplate) {
this(cache, redisTemplate, false);
}
/**
* @param cache spring cache
* @param redisTemplate 用于进行缓存的批量操作
* @param clearExist 是否初始化缓存(清除redis中已经存在的缓存数据)
*/
public RedisMultiCache(Cache cache, RedisTemplate redisTemplate, boolean clearExist) {
this.cache = cache;
this.redisTemplate = redisTemplate;
// 创建缓存前, 是否初始化缓存,清除原有的
if (clearExist) {
cache.clear();
}
}
@Override
public String getName() {
return cache.getName();
}
@Override
public Object getNativeCache() {
return cache.getNativeCache();
}
@Override
public ValueWrapper get(Object key) {
try {
return cache.get(key);
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
return null;
}
@Override
public <T> T get(Object key, Class<T> aClass) {
try {
return cache.get(key, aClass);
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
return null;
}
/**
* 获取不为空的缓存,如果缓存中存在为null,则使用 valueLoader 重新加载,并将结果存入缓存中
* @param key 缓存数据的key
* @param valueLoader 加载缓存的方法
* @return {@code null} if valueLoader returned null
* @param <T> 缓存数据类型
*/
public <T> T getNonNull(Object key, Callable<T> valueLoader) {
T value;
try {
ValueWrapper wrapper = cache.get(key);
if (wrapper == null || (value = (T) wrapper.get()) == null) {
value = valueLoader.call();
cache.put(key, value);
}
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
try {
return valueLoader.call();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return value;
}
/**
* 如果缓存中存储的为null,则直接返回null
* 如果 valueLoader 执行时抛出异常,则使用 RuntimeException 继续抛出,调用方自己处理
* @return {@code null} if cached null
*/
@Override
public <T> T get(Object key, Callable<T> valueLoader) {
T value;
try {
value = cache.get(key, valueLoader);
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
try {
return valueLoader.call();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return value;
}
@Override
public void put(Object key, Object value) {
try {
cache.put(key, value);
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
}
@Override
public ValueWrapper putIfAbsent(Object key, Object value) {
try {
return cache.putIfAbsent(key, value);
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
return null;
}
@Override
public void evict(Object key) {
try {
cache.evict(key);
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
}
@Override
public void clear() {
try {
cache.clear();
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
}
/**
* 批量读取缓存,默认 key-value 为一对一的关系
* @see #list(List, Function, Function, Class, boolean)
*/
public <K, V> List<V> list(List<K> keys, Function<Collection<K>, Collection<V>> valueLoader, Function<V, K> keyMapper, Class<V> vClass) {
return list(keys, valueLoader, keyMapper, vClass, false);
}
/**
* 批量获取缓存数据, 如不存在则通过 valueLoader 获取数据, 并存入缓存中
* 如果缓存中存在 null,则视为不存在,仍然通过 valueLoader 加载,如需要防止缓存穿透,建议存入空对象,而非 null
* @param keys key
* @param valueLoader 数据加载器
* @param keyMapper 根据value获取key 映射器
* @param vClass 返回数据类型
* @param isListValue value是否为list类型,即一个key对应一个List<V>
* @param <K> key 的类型
* @param <V> value 的类型
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public <K, V> List<V> list(List<K> keys, Function<Collection<K>, Collection<V>> valueLoader, Function<V, K> keyMapper, Class<V> vClass, boolean isListValue) {
Objects.requireNonNull(redisTemplate, "redisTemplate required not null");
List list0 = Collections.emptyList();
try {
list0 = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
for (K k : keys) {
byte[] key = keySerializer.serialize(createCacheKey(k));
if (key != null) {
connection.get(key);
} else {
log.warn("RedisMultiCache 批量操作序列化失败, key={}", k);
}
}
return null;
});
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
int size = keys.size();
// 缓存不存在的key
List<K> nkeys = new ArrayList<>(size);
List<V> values = new ArrayList<>(size);
if (CollectionUtils.isEmpty(list0)) {
nkeys.addAll(keys);
} else {
for (int i = 0; i < list0.size(); i++) {
Object o = list0.get(i);
if (o == null) {
nkeys.add(keys.get(i));
} else {
// redis中存储的是 JsonObject 或 JsonArray 对象
if (o instanceof JSONArray) {
values.addAll(((JSONArray) o).toJavaList(vClass));
} else if (o instanceof JSONObject) {
values.add(JSONObject.toJavaObject((JSONObject) o, vClass));
}
}
}
}
if (!CollectionUtils.isEmpty(nkeys)) {
Collection<V> nValue = valueLoader.apply(nkeys);
Map kvMap;
if (isListValue) {
kvMap = nValue.stream().filter(Objects::nonNull).collect(Collectors.groupingBy(keyMapper));
} else {
kvMap = nValue.stream().filter(Objects::nonNull).collect(Collectors.toMap(keyMapper, Function.identity()));
}
putBatch(kvMap);
values.addAll(nValue);
}
return values;
}
/**
* 批量存入缓存
* @param map 需要存入的数据
* @param <K> 数据的 key 的类型
* @param <V> 数据的 value 的类型
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public <K, V> void putBatch(Map<K, V> map) {
if (CollectionUtils.isEmpty(map)) {
return;
}
Objects.requireNonNull(redisTemplate, "redisTemplate required not null");
try {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
for (Map.Entry<K, V> entry : map.entrySet()) {
byte[] key = keySerializer.serialize(createCacheKey(entry.getKey()));
byte[] value = valueSerializer.serialize(entry.getValue());
if (key != null && value != null) {
connection.set(key, value);
} else {
log.warn("RedisMultiCache 批量操作序列化失败, entry={}", entry);
}
}
return null;
});
} catch (Exception e) {
log.error("RedisMultiCache 异常", e);
}
}
/**
* @param key 缓存数据的key
* @param <K> key 的类型
* @return 缓存数据在 redis 中的 key
*/
private <K> String createCacheKey(K key) {
return RedisCacheConfig.computeCachePrefix(getName())+ key.toString();
}
}
配置缓存 RedisCacheConfig
@Configuration
//@EnableCaching // 如果需要使用注解形式的缓存则加上该注解
public class RedisCacheConfig extends CachingConfigurerSupport {
/**
* 默认缓存存活时间(time to live), 2 小时
*/
private static final long DEFAULT_TTL_SECONDS = 2 * 60 * 60;
public static final String DATA_CACHE = "data_cache";
public static final String DATA_LIST_CACHE = "data_list_cache";
/*
* 注册对应的 RedisMultiCache
*/
@Bean("dataCache")
public RedisMultiCache dataCache(RedisTemplate redisTemplate, RedisCacheManager cacheManager) {
// 这里为了方便演示,创建缓存时,清空已存在的缓存数据,
return new RedisMultiCache(cacheManager.getCache(DATA_CACHE), redisTemplate, true);
}
@Bean("dataListCache")
public RedisMultiCache dataListCache(RedisTemplate redisTemplate, RedisCacheManager cacheManager) {
return new RedisMultiCache(cacheManager.getCache(DATA_LIST_CACHE), redisTemplate);
}
/*
* redis cache 配置
*/
/**
* 配置 redisTemplate 使用 fastjson 作为缓存数据的序列化方式
*/
@Bean("redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(StringRedisSerializer.UTF_8);
redisTemplate.setStringSerializer(StringRedisSerializer.UTF_8);
redisTemplate.setValueSerializer(new FastJsonRedisSerializer<>(Object.class));
return redisTemplate;
}
/**
* redis 缓存管理器
* 可以添加自定义缓存配置
*/
@Bean
public RedisCacheManager redisCacheManager(RedisConnectionFactory redisConnectionFactory) {
RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory);
return RedisCacheManager.builder(redisCacheWriter)
.withInitialCacheConfigurations(initCacheConfig())
// 设置默认的缓存配置,默认过期时间 2小时
.cacheDefaults(redisCacheConfig(DEFAULT_TTL_SECONDS))
.transactionAware()
.build();
}
/**
* 如果需要指定某个缓存的个性化的配置(如过期时间),可以在map里面添加
* @return 初始化缓存配置
*/
private Map<String, RedisCacheConfiguration> initCacheConfig() {
Map<String, RedisCacheConfiguration> configurationMap = new HashMap<>(4);
// 指定 data_cache 的过期时间为 半小时
configurationMap.put(DATA_CACHE, redisCacheConfig(30 * 60));
return configurationMap;
}
/**
* 设置 redis 缓存的配置
* 主要设置缓存时间, 和序列化方式
* @param seconds 指定过期时间 单位:秒
*/
@SuppressWarnings("unchecked")
private RedisCacheConfiguration redisCacheConfig(long seconds) {
return RedisCacheConfiguration.defaultCacheConfig()
// TODO 注意 这里的 key 和 value 的 serialize 务必和 redisTemplate 保持一致,否则批量操作和单个操作使用不一样的序列化工具会导致读取缓存时报错
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(StringRedisSerializer.UTF_8))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new FastJsonRedisSerializer<>(Object.class)))
.computePrefixWith(RedisCacheConfig::computeCachePrefix)
.entryTtl(Duration.ofSeconds(seconds));
}
/**
* 根据缓存名设置对应缓存的key的前缀,推荐使用 程序名+缓存名
* @param cacheName 缓存名(类似数据库表名)
*/
public static String computeCachePrefix(String cacheName) {
return "appName-" + cacheName + "::";
}
}
使用示例
@Resource(name = "dataCache")
private RedisMultiCache dataCache;
@Test
public void testCache(){
List<Data> dataList = new ArrayList<>(5);
List<Integer> keys = Lists.newArrayList(1,2,3,4,5);
for (int i = 1; i < 6; i++) {
dataList.add(new Data(i, String.valueOf(i)));
}
Map<Integer, Data> dataMap = dataList.stream().collect(Collectors.toMap(Data::getId, Function.identity()));
dataCache.putBatch(dataMap);
dataCache.put(10, new Data(10,"10"));
System.out.println("10===" + dataCache.get(10).get());
System.out.println("5===" + dataCache.get(5).get());
// 加一个不存在的key
keys.add(100);
// 测试 null
dataCache.put(20, null);
keys.add(20);
Cache.ValueWrapper wrapper = dataCache.get(20);
System.out.println(wrapper); // wrapper 不为null,说明存在null值
List<Data> list1 = dataCache.list(keys, this::listData, Data::getId, Data.class);
System.out.println(list1); // debug 或 看listData的输出 可以发现,100是从listData加载的
}
/**
* @param keys 注意 这个参数类型 一定要是 Collection 类型
* @return
*/
private List<Data> listData(Collection<Integer> keys) {
List<Data> dataList = new ArrayList<>(keys.size());
for (Integer key : keys) {
System.out.println("create data =》 "+key);
dataList.add(new Data(key, String.valueOf(key)));
}
return dataList;
}
需要注意的问题
-
保持序列化方式一样
应该保证 redisTemplate 和 RedisCacheManage 中使用同一种序列化方式,如果序列化方式不同,则缓存单一操作和批量操作的缓存数据不同步(即读取单个缓存时读取不到批量存入的,批量的读取不到单个的),或者在反序列化时抛出异常 -
valueLoader 方法 参数应该使用Collection
不能使用List类型,否则不能确定范型。使用Collection是为了兼容传入参数为Set的情况。且部分数据库框架的参数类型也用的Collection接口而非List。 -
批量读取时,会忽视缓存中的null值
如果在缓存中存入了value为null,使用get仍然可以得到,但使用list批量读取时,无法读取到。如需防止缓存穿透,请使用空对象,而非null。当然也可以自己重写里面的 put 或 list 的逻辑。 -
批量查询的数量不易过大
如果一次查询大量的数据,pipeline 效率也会降低。考虑到 valueLoader 如果从数据库中获取数据,也要限制批次,因此没有在批量操作中加入分批的逻辑,如需查询大量数据,请调用前进行分批。使用Guava的Lists.partition
可以很方便的对数据进行分批。
代码已上传至 GitHub 点击查看
本文地址:https://blog.csdn.net/Wu_Shang001/article/details/107883274