欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring boot 之 多级缓存实现

程序员文章站 2022-04-27 18:49:27
...

一、为什么使用多级缓存

        缓存的使用是解决高并发问题的一个重要途径,所以缓存很重要。一般情况下使用本地缓存,如ehcache,guava等就可以了,但是针对分布式、集群架构,本地缓存无法做到相互之间数据保持一致,如果使用redis缓存,则需要不断的去连接redis,这个中间也是有一定的资源消耗,在并发较小的时候这些消耗不影响系统,但是并发较大时就可能导致无法获取到redis连接,从而导致系统奔溃等等问题。

二、搭建自己的二级缓存

    要想搭建自己的二级缓存平台就需要了解spring boot的cache实现机制,cache的实现核心就是org.springframework.cache.Cache接口和org.springframework.cache.CacheManager接口,前者是缓存类,后者是管理缓存的,所以只要我们重写了这两个类,那么就可以实现缓存的重写。

1.重写cache

cache提供了一下方法

    String getName();

    Object getNativeCache();

    Cache.ValueWrapper get(Object var1);

    <T> T get(Object var1, Class<T> var2);

    <T> T get(Object var1, Callable<T> var2);

    void put(Object var1, Object var2);

    Cache.ValueWrapper putIfAbsent(Object var1, Object var2);

    void evict(Object var1);

    void clear();

这里我们只需要重写这些方法就行了,由于我们要实现二级缓存,所以要引入本地缓存和redis 缓存,也就是我们只需要在自定义的cache中实现本地缓存和redis 缓存同时使用就可以了。具体实现如下:

package com.wangcongming.cache.core.layering;

import com.alibaba.fastjson.JSON;
import com.wangcongming.cache.core.redis.cache.CustomizedRedisCache;
import com.wangcongming.cache.core.redis.listener.ChannelTopicEnum;
import com.wangcongming.cache.core.redis.listener.RedisPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.caffeine.CaffeineCache;
import org.springframework.cache.support.AbstractValueAdaptingCache;
import org.springframework.cache.support.NullValue;
import org.springframework.data.redis.core.RedisOperations;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

/**
 * @author wangcongming
 * @Package com.wangcongming.cache.core.layering
 * @Description: 双缓存类  代码来源于网络,并在此基础做了修改
 * @date 2018/5/18 15:16
 */
public class LayeringCache extends AbstractValueAdaptingCache {
    Logger logger = LoggerFactory.getLogger(LayeringCache.class);

    /**
     * 缓存的名称
     */
    private final String name;

    /**
     * 是否使用一级缓存
     */
    private boolean usedFirstCache = true;

    /**
     * redi缓存
     */
    private final CustomizedRedisCache redisCache;

    /**
     * Caffeine缓存
     */
    private final CaffeineCache caffeineCache;

    RedisOperations<? extends Object, ? extends Object> redisOperations;

    /**
     * @param name              缓存名称
     * @param prefix            缓存前缀
     * @param redisOperations   操作Redis的RedisTemplate
     * @param expiration        redis缓存过期时间
     * @param allowNullValues   是否允许存NULL,默认是false
     * @param usedFirstCache    是否使用一级缓存,默认是true
     * @param caffeineCache     Caffeine缓存
     */
    public LayeringCache(String name, byte[] prefix, RedisOperations<? extends Object, ? extends Object> redisOperations,
                         long expiration, boolean allowNullValues, boolean usedFirstCache,
                         com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache) {

        super(allowNullValues);
        this.name = name;
        this.usedFirstCache = usedFirstCache;
        this.redisOperations = redisOperations;
        this.redisCache = new CustomizedRedisCache(name, prefix, redisOperations, expiration, allowNullValues);
        this.caffeineCache = new CaffeineCache(name, caffeineCache, allowNullValues);
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public Object getNativeCache() {
        return this;
    }

    public CustomizedRedisCache getSecondaryCache() {
        return this.redisCache;
    }

    public CaffeineCache getFirstCache() {
        return this.caffeineCache;
    }

    @Override
    public ValueWrapper get(Object key) {
        ValueWrapper wrapper = null;
        if (usedFirstCache) {
            // 查询一级缓存
            wrapper = caffeineCache.get(key);
            logger.debug("查询一级缓存 key:{},返回值是:{}", key, JSON.toJSONString(wrapper));
        }

        if (wrapper == null) {
            // 查询二级缓存
            wrapper = redisCache.get(key);
//            caffeineCache.put(key, wrapper == null ? null : JSON.toJSONString(wrapper));
            logger.debug("查询二级缓存,并将数据放到一级缓存。 key:{},返回值是:{}", key, JSON.toJSONString(wrapper));
        }
        return wrapper;
    }

    @Override
    public <T> T get(Object key, Class<T> type) {
        T value = null;
        if (usedFirstCache) {
            // 查询一级缓存
            value = caffeineCache.get(key, type);
            logger.debug("查询一级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
        }

        if (value == null) {
            // 查询二级缓存
            value = redisCache.get(key, type);
            caffeineCache.put(key, value);
            logger.debug("查询二级缓存,并将数据放到一级缓存。 key:{},返回值是:{}", key, JSON.toJSONString(value));
        }
        return value;
    }

    @SuppressWarnings("unchecked")
	@Override
    public <T> T get(Object key, Callable<T> valueLoader) {
        T value = null;
        if (usedFirstCache) {
//            Object o = caffeineCache.get(key).get();
            // 查询一级缓存,如果一级缓存没有值则调用getForSecondaryCache(k, valueLoader)查询二级缓存
            value = (T) caffeineCache.getNativeCache().get(key, k -> getForSecondaryCache(k, valueLoader));
//            this.put(key,value);
        } else {
//             直接查询二级缓存
            value = (T) getForSecondaryCache(key, valueLoader);
        }

        if (value instanceof NullValue) {
            return null;
        }
        return value;
    }

    @Override
    public void put(Object key, Object value) {
        if (usedFirstCache) {
            caffeineCache.put(key, value);
        }
        redisCache.put(key, value);
    }

    @Override
    public ValueWrapper putIfAbsent(Object key, Object value) {
        if (usedFirstCache) {
            caffeineCache.putIfAbsent(key, value);
        }
        return redisCache.putIfAbsent(key, value);
    }

    @Override
    public void evict(Object key) {
        // 删除的时候要先删除二级缓存再删除一级缓存,否则有并发问题
        redisCache.evict(key);
        if (usedFirstCache) {
            // 删除一级缓存需要用到redis的Pub/Sub(订阅/发布)模式,否则集群中其他服服务器节点的一级缓存数据无法删除
            Map<String, Object> message = new HashMap<>();
            message.put("cacheName", name);
            message.put("key", key);
            // 创建redis发布者
            RedisPublisher redisPublisher = new RedisPublisher(redisOperations, ChannelTopicEnum.REDIS_CACHE_DELETE_TOPIC.getChannelTopic());
            // 发布消息
            redisPublisher.publisher(message);
        }
    }

    @Override
    public void clear() {
        redisCache.clear();
        if (usedFirstCache) {
            // 清除一级缓存需要用到redis的订阅/发布模式,否则集群中其他服服务器节点的一级缓存数据无法删除
            Map<String, Object> message = new HashMap<>();
            message.put("cacheName", name);
            // 创建redis发布者
            RedisPublisher redisPublisher = new RedisPublisher(redisOperations, ChannelTopicEnum.REDIS_CACHE_CLEAR_TOPIC.getChannelTopic());
            // 发布消息
            redisPublisher.publisher(message);
        }
    }

    @Override
    protected Object lookup(Object key) {
        Object value = null;
        if (usedFirstCache) {
            value = caffeineCache.get(key);
            logger.debug("查询一级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
        }
        if (value == null) {
            value = redisCache.get(key);
            logger.debug("查询二级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
        }
        return value;
    }

    /**
     * 查询二级缓存
     *
     * @param key
     * @param valueLoader
     * @return
     */
    private <T> Object getForSecondaryCache(Object key, Callable<T> valueLoader) {
        T value = redisCache.get(key, valueLoader);
        logger.debug("查询二级缓存 key:{},返回值是:{}", key, JSON.toJSONString(value));
        return toStoreValue(value);
    }
}

从上述代码我们可以看到,在LayeringCache的构造方法中我们直接创建了CaffeineCache和redisCache(这里的rediscache进行了重写),然后在get方法中,先去取本地缓存,没有才会去查询redisCache

现在缓存已经有了,那么怎么去实现缓存的创建呢,这里就要使用cachemanager来实现缓存的管理。

2.重写CacheManager

cachemanager是来管理缓存的,他负责cache的创建

由于我们重写了cache,那么原有的cachemanager就无法再创建此cache,所以需要重写。

首先我们来看cachemanager的具体方法有哪些

package org.springframework.cache;

import java.util.Collection;

public interface CacheManager {
    Cache getCache(String var1);

    Collection<String> getCacheNames();
}

我们可以看到cachemanager接口只提供了两个方法,一个是获取cache,一个是获取所有cache name。

也就是我们只需要重写这两个方法就可以了

具体实现如下:

private final ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<String, Cache>(16);
private Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
        .expireAfterAccess(DEFAULT_EXPIRE_AFTER_WRITE, TimeUnit.HOURS)
        .initialCapacity(DEFAULT_INITIAL_CAPACITY)
        .maximumSize(DEFAULT_MAXIMUM_SIZE);
    public LayeringCacheManager(RedisOperations redisOperations) {
        this(redisOperations, Collections.<String>emptyList());
    }

    public LayeringCacheManager(RedisOperations redisOperations, Collection<String> cacheNames) {
        this(redisOperations, cacheNames, false);
    }

    public LayeringCacheManager(RedisOperations redisOperations, Collection<String> cacheNames, boolean allowNullValues) {
        this.allowNullValues = allowNullValues;
        this.redisOperations = redisOperations;
        setCacheNames(cacheNames);
    }

    @Override
    public Cache getCache(String name) {
        Cache cache = this.cacheMap.get(name);
        if (cache == null && this.dynamic) {
            synchronized (this.cacheMap) {
                cache = this.cacheMap.get(name);
                if (cache == null) {
                    cache = createCache(name);
                    this.cacheMap.put(name, cache);
                }
            }
        }
        return cache;
    }

   protected Cache createCache(String name) {
        return new LayeringCache(name, (usePrefix ? cachePrefix.prefix(name) : null), redisOperations,
                getSecondaryCacheExpirationSecondTime(name), isAllowNullValues(), getUsedFirstCache(name),
                createNativeCaffeineCache(name));
    }

    @Override
    public Collection<String> getCacheNames() {
        return Collections.unmodifiableSet(this.cacheMap.keySet());
    }
如此我们就实现了二级缓存


注:部分代码来源于网络