Redis实现分布式锁和等待序列的方法示例
程序员文章站
2022-06-17 21:21:43
在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 reentranklo...
在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 reentranklock 这些锁的作用范围都是 jvm ,说白了在集群下没啥用。这时我们就需要能在多台 jvm 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。
背景
最近在做一个消费 kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 redis 的实现方式(因为网上例子多)
分析
redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式
- 丢弃
- 等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 list 类型实现等待序列的作用
代码
直接上代码 其实直接redis的工具类就可以解决了
package com.test import redis.clients.jedis.jedis; import java.util.collections; import java.util.list; /** * @desc redis队列实现方式 * @anthor * @date **/ public class redisucuitl { private static final string lock_success = "ok"; private static final string set_if_not_exist = "nx"; private static final string set_with_expire_time = "px"; private static final long release_success = 1l; private redisucuitl() { } /** * logger **/ /** * 存储redis队列顺序存储 在队列首部存入 * * @param key 字节类型 * @param value 字节类型 */ public static long lpush(jedis jedis, final byte[] key, final byte[] value) { return jedis.lpush(key, value); } /** * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时 * * @param srckey * @param dstkey * @param timeout 0 表示永不超时 * @return */ public static byte[] brpoplpush(jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) { return jedis.brpoplpush(srckey, dstkey, timeout); } /** * 返回制定的key,起始位置的redis数据 * @param rediskey * @param start * @param end -1 表示到最后 * @return */ public static list<byte[]> lrange(jedis jedis,final byte[] rediskey, final long start, final long end) { return jedis.lrange(rediskey, start, end); } /** * 删除key * @param rediskey */ public static void delete(jedis jedis, final byte[] rediskey) { return jedis.del(rediskey); } /** * 尝试加锁 * @param lockkey key名称 * @param requestid 身份标识 * @param expiretime 过期时间 * @return */ public static boolean trygetdistributedlock(jedis jedis,final string lockkey, final string requestid, final int expiretime) { string result = jedis.set(lockkey, requestid, set_if_not_exist, set_with_expire_time, expiretime); return lock_success.equals(result); } /** * 释放锁 * @param lockkey key名称 * @param requestid 身份标识 * @return */ public static boolean releasedistributedlock(jedis jedis,final string lockkey, final string requestid) { final string script = "if redis.call('get', keys[1]) == argv[1] then return redis.call('del', keys[1]) else return 0 end"; jedis.eval(script, collections.singletonlist(lockkey), collections.singletonlist(requestid)); return release_success.equals(result); } }
业务逻辑主要代码如下
1.先消耗队列中的
while(true){ // 消费队列 try{ // 被放入redis队列的数据 序列化后的 byte[] bytes = redisucuitl.brpoplpush(keystr.getbytes(utf_8), dstkeystr.getbytes(utf_8), 1); if(bytes == null || bytes.isempty()){ // 队列中没数据时退出 break; } // 反序列化对象 map<string, object> singlemap = (map<string, object>) objectserialutil.bytestoobject(bytes); // 塞入唯一的值 防止被其他线程误解锁 string requestid = uuid.randomuuid().tostring(); boolean lockgetflag = redisucuitl.trygetdistributedlock(keystr,requestid, 100); if(lockgetflag){ // 成功获取锁 进行业务处理 //todo // 处理完毕释放锁 boolean freelock = redisucuitl.releasedistributedlock(keystr, requestid); }else{ // 未能获得锁放入等待队列 redisucuitl.lpush(keystr.getbytes(utf_8), objectserialutil.objecttobytes(param)); } }catch(exception e){ break; } }
2.处理最新接到的数据
同样是走尝试获取锁,获取不到放入队列的流程
一般序列化用 fastjson 之列的就可以了,这里用的是 jdk 自带的,工具类如下
public class objectserialutil { private objectserialutil() { // 工具类 } /** * 将object对象序列化为byte[] * * @param obj 对象 * @return byte数组 * @throws exception */ public static byte[] objecttobytes(object obj) throws ioexception { bytearrayoutputstream bos = new bytearrayoutputstream(); objectoutputstream oos = new objectoutputstream(bos); oos.writeobject(obj); byte[] bytes = bos.tobytearray(); bos.close(); oos.close(); return bytes; } /** * 将bytes数组还原为对象 * * @param bytes * @return * @throws exception */ public static object bytestoobject(byte[] bytes) { try { bytearrayinputstream bin = new bytearrayinputstream(bytes); objectinputstream ois = new objectinputstream(bin); return ois.readobject(); } catch (exception e) { throw new baseexception("反序列化出错!", e); } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。