java实现的redis分布式锁
程序员文章站
2022-03-08 11:32:38
...
业务场景多了,就应该把场景代码工具化,减少重复代码;趁周六在这里总结一下java实现的redis分布式锁代码;
使用的技术点:
1、redis函数setnx;
2、redis监控函数watch;
3、String.intern在jvm的内存操作,和String.intern的替代方案:guava;
4、@FunctionalInterface函数接口和lambda表达式的应用。
5、RedisTemplate;尽量使用RedisTemplate操作redis;直接操作jedispool人多的时候会乱。
reidis锁操作类
package com.framework.cache; import java.util.Random; import com.framework.cache.execption.RedisExecption; /** * redis锁操作类 * @author lyq * */ public class RedisLock { public static final String LOCKED = "TRUE"; public static final long MILLI_NANO_CONVERSION = 1000000L; public static final long DEFAULT_TIME_OUT = 1000L; public static final Random RANDOM = new Random(); public static final int EXPIRE = 1200; private String key; private String lockId; private boolean locked = false; public RedisLock(String key) { lockId = System.currentTimeMillis()+""; this.key = String.format("lock:%s", key); } /** * 加锁 * @param expireSeconds 超时时间,单位:秒 * @return 获取锁成功返回true;超时返回false;其它报异常 */ public boolean lock(int expireSeconds) { long nano = System.nanoTime(); long timeout= expireSeconds * 1000000000L; try { // 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。 if (RedisHelper.getExpire(this.key) == -1) { RedisHelper.expire(this.key, expireSeconds); } while (System.nanoTime() - nano < timeout) { if (RedisHelper.setnx(this.key, lockId)) { RedisHelper.expire(this.key, expireSeconds); this.locked = true; return this.locked; } Thread.sleep(3L, RANDOM.nextInt(500)); } } catch (Exception e) { throw new RedisExecption("Locking error", e); } return false; } public boolean lock() { return lock(2); } /** * 解锁 */ @SuppressWarnings("unchecked") public void unlock() { if (this.locked) { String _lockId = this.lockId; RedisHelper.getRedisTemplate().execute(connection->{ byte[] keyByte = RedisHelper.serialize(key); //开启watch之后,如果key的值被修改,则事务失败,exec方法返回null connection.watch(keyByte); String startTime = RedisHelper.unserialize(connection.get(keyByte), String.class); if (_lockId.equals(startTime)) { // 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。 connection.multi(); connection.del(keyByte); connection.exec(); } connection.unwatch(); return true; },true); } } public String getLockKey(){ return this.key; } }
redis对外帮助类
package com.framework.cache; import java.lang.reflect.Type; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import com.framework.cache.execption.RedisExecption; import com.framework.cache.execption.RedisLockTimeException; import com.framework.cache.lock.LockCallback; import com.framework.utils.NumUtil; import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import redis.clients.jedis.JedisPoolConfig; /** * redis对外静态帮助类 * @author lyq * */ public class RedisHelper { public static RedisTemplate redisTemplate = null; private static Interner<Object> pool = Interners.newWeakInterner(); private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd hh:mm:ss").create(); private static final String IGNORE_EXEC_KEY = "_ignoreExec"; /** * 加锁执行 * @param callBack 回调函数 * @param lockKey * @param expireSeconds 执行超时时间,单位:秒 * @return 返回callback回调函数执行结果 */ public static <T> T lockExec(LockCallback<T> callBack, String lockKey, Integer expireSeconds) { /** * redisLock其实就是规定时间内一直循环网络请求redis资源; * 为了防止并发的时候,太多线程循环抢redis锁;导致资源不足,需要对字符串加锁; * 字符加锁,需要使用intern;而1.7以后,String.intern的对象会一直存在jvm内存里面,大量使用会导致内存异常;替代方案就是使用guava的Interners.newWeakInterner()代替String.intern; * synchronized不支持过期,如果需要加过期时间,则使用ReentrantLock 的 tryLock(long timeout, TimeUnit unit)方法。 */ synchronized (pool.intern(lockKey)) { RedisLock lock = new RedisLock(lockKey); try { if(lock.lock(expireSeconds)){ return callBack.exec(); }else{ throw new RedisLockTimeException(lockKey,expireSeconds); } } catch (Exception e) { throw new RedisExecption(e); } finally { lock.unlock(); } } } /** * 获取超时时间 * @param key * @return 单位秒 */ public static Long getExpire(String key){ byte[] rawKey = RedisHelper.serialize(key); return (Long) redisTemplate.execute(connection -> connection.ttl(rawKey), true); } /** * 设置超时时间 * @param key * @param seconds 单位秒 * @return */ public static Boolean expire(String key,Integer seconds){ byte[] rawKey = RedisHelper.serialize(key); return (Boolean) redisTemplate.execute(connection -> connection.expire(rawKey,seconds), true); } /** * 尝试加锁并执行操作 如果加锁失败直接返回 如果加锁成功,则指定锁时间 * * @param action * 待执行的动作 * @param key * 加锁的key * @param expireSeconds * 锁超时时间,单位秒 * @param <T> * 泛型参数 * @return */ public static <T> T mutexExec(LockCallback<T> action, String key, int expireSeconds) { if (setnx(key + IGNORE_EXEC_KEY, "TRUE", expireSeconds)) { try { return action.exec(); } catch (Exception e) { throw new RedisExecption("ignoreExec error", e); } finally { del(key + IGNORE_EXEC_KEY); } } return null; } /** * 设置值成功返回true,key已存在则设置值失败,返回false * @param key * @param value * @param expireSeconds,设置值过期时间,即使设置值失败,也会一直覆盖过期是时间 * @return * @throws RedisExecption */ @SuppressWarnings({ "unchecked" }) public static boolean setnx(final String key, final Object value, final Integer expireSeconds) throws RedisExecption { if (StringUtils.isEmpty(key) || value == null) { throw new RedisExecption("key或value不能为空"); } return (Boolean) redisTemplate.execute(connection-> { try { Boolean result = false; connection.multi(); setnx(key, value, expireSeconds, connection); List<Object> list = connection.exec(); if (CollectionUtils.isNotEmpty(list)) { result = (Boolean) list.get(0); } return result; } catch (Exception e) { // connection.discard(); throw new RuntimeException(e); } }, true); } /** * 设置值成功返回true,key已存在则设置值失败,返回false * @param key * @param value * @return * @throws RedisExecption */ @SuppressWarnings({ "unchecked" }) public static boolean setnx(final String key, final Object value) throws RedisExecption { if (StringUtils.isEmpty(key) || value == null) { throw new RedisExecption("key或value不能为空"); } return (Boolean) redisTemplate.execute(connection-> { try { Boolean result = false; connection.multi(); setnx(key, value, -1, connection); List<Object> list = connection.exec(); if (CollectionUtils.isNotEmpty(list)) { result = (Boolean) list.get(0); } return result; } catch (Exception e) { // connection.discard(); throw new RuntimeException(e); } }, true); } /** * 事务执行,不返回 * @param key * @param value * @param expireSeconds * @param connection */ public static void setnx(final String key, final Object value, final Integer expireSeconds, final RedisConnection connection) { byte[] byteKey = serialize(key); connection.setNX(byteKey, serialize(value)); if (NumUtil.intValue(expireSeconds) != -1) { connection.expire(byteKey, expireSeconds); } } /** * 删除键 * @param key * @return * @throws RedisExecption */ @SuppressWarnings("unchecked") public static Long del(final String key) throws RedisExecption { return (Long) redisTemplate.execute(connection-> del(key, connection), true); } public static Long del(final String key, final RedisConnection connection) { try { return connection.del(serialize(key)); } catch (Exception e) { throw new RuntimeException(e); } } /** * 序列化 * @param value * @return */ public static byte[] serialize(Object value) { String valStr = getJsonStr(value); return redisTemplate.getStringSerializer().serialize(valStr); } /** * 返序列化 * @param bytes * @param clz * @return */ public static <T> T unserialize(byte[] bytes, Class<T> clz) { // String valStr = getJsonStr); Object json = redisTemplate.getStringSerializer().deserialize(bytes); if (json == null) { return null; } if (clz == null || clz.getName().equals("java.lang.String")) { return (T) json; } else { return gson.fromJson(json.toString(), clz); } } /** * private static java.lang.reflect.Type imgType = new TypeToken<ArrayList * <T>>() {}.getType(); * * @param bytes * @param type * @return */ public static <T> T unserialize(byte[] bytes, Type type) { // String valStr = getJsonStr); Object json = redisTemplate.getStringSerializer().deserialize(bytes); if (json == null) { return null; } if (type == null) { return (T) json; } else { return gson.fromJson(json.toString(), type); } } private static String getJsonStr(Object value) { String valStr = null; if (value == null) { valStr = null; } if (value instanceof String) { valStr = value.toString(); } else if (value instanceof Number) { valStr = value + ""; } else { valStr = gson.toJson(value); } return valStr; } @SuppressWarnings("rawtypes") public static RedisTemplate getRedisTemplate() { return redisTemplate; } @SuppressWarnings({ "rawtypes", "static-access" }) public void setRedisTemplate(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } }
redis异常处理类
package com.framework.cache.execption; import java.io.PrintWriter; import java.io.StringWriter; /** * redis异常类,redis执行异常报错 * @author lyq * */ public class RedisExecption extends RuntimeException { private static final long serialVersionUID = 155L; //private Logger log = Logger.getLogger(this.getClass()); public RedisExecption() { } public RedisExecption(String message) { super(message); } public RedisExecption(Throwable cause) { super(cause); } public RedisExecption(String message, Throwable cause) { super(message, cause); } /** * 以字符串形式返回异常堆栈信息 * @param e * @return 异常堆栈信息字符串 */ public static String getStackTrace(Exception e) { StringWriter writer = new StringWriter(); e.printStackTrace(new PrintWriter(writer,true)); return writer.toString(); } }
分布式锁超时异常
package com.framework.cache.execption; /** * 分布式锁超时报错 * @author lyq * */ public class RedisLockTimeException extends RedisExecption { /** * 超时报错 * @param expireSeconds 单位秒 */ private static final long serialVersionUID = 1L; public RedisLockTimeException(String lockKey,Integer expireSeconds) { super(String.format("分布式锁 ‘%s’等待超时: %ss",lockKey,expireSeconds)); } }
分布式锁测试类
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import com.framework.cache.RedisHelper; import redis.clients.jedis.JedisPoolConfig; public class RedisTest { public static void main(String[] args) { RedisTemplate t = new RedisTemplate(); // InputStream is = // RedisHelper.class.getClassLoader().getResourceAsStream("redis.properties"); String hostname = ""; String password = ""; String port = ""; // try { // prop.load(is); // } catch (IOException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } /*** 热备start ***/ // String sentinelHost = "localhost"; // int sentinelport = 26379; // RedisSentinelConfiguration sentinelConfiguration = new // RedisSentinelConfiguration(); // sentinelConfiguration.setMaster("mymaster"); // sentinelConfiguration.sentinel(sentinelHost, sentinelport); // JedisConnectionFactory connectionFactory = new // JedisConnectionFactory(sentinelConfiguration); /*** 热备end ***/ /*** 单机start ***/ // hostname = "120.25.226.230"; hostname = "127.0.0.1"; password = ""; port = "6379"; JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setDatabase(5); connectionFactory.setHostName(hostname); connectionFactory.setPort(Integer.parseInt(port)); /*** 单机end ***/ JedisPoolConfig config = new redis.clients.jedis.JedisPoolConfig(); config.setMaxTotal(5000); config.setMaxIdle(200); config.setMaxWaitMillis(5000); config.setTestOnBorrow(true); connectionFactory.afterPropertiesSet(); t.setConnectionFactory(connectionFactory); connectionFactory.setPoolConfig(config); t.afterPropertiesSet(); RedisHelper.redisTemplate = t; /************************************ * 分布式锁测试 start *****************************************/ ThreadPoolExecutor executor = null; // 有限队列,缓存30个请求等待,线程池默认策略,缓存超出上限报错 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(30); // 初始化30个线程,最多30个线程 executor = new ThreadPoolExecutor(30, 30, 10, TimeUnit.SECONDS, workQueue, new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("redis-lock-thread"); return thread; } }); CyclicBarrier barrier = new CyclicBarrier(30); for (Integer i = 0; i < 30; i++) { final int _i = i; executor.execute(() -> { try { barrier.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } //锁等待10秒超时 RedisHelper.lockExec(() -> { System.out.println(_i + "====="); try { Thread.sleep(2000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return 2; }, "test",10); }); } /************************************ * 分布式锁测试 end *****************************************/ } }