使用Redis实现延时消息队列(Sorted Set)
程序员文章站
2022-06-26 17:05:46
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录一、场景设计二、延时队列实现Sorted Set相关命令三、演示总结一、场景设计1.用户下单15分钟未付款,取消订单恢复库存.二、延时队列实现订单创建的时候,订单ID和当前时间戳分别作为Sorted Set的member和score添加到订单队列Sorted Set中线程通过Sorted Set的命令ZREVRANGEBYSCORE弹出指定数量的订单ID进行处理。项目3Sorted Set相关命令三、演示总结...
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
一、场景设计
1.用户下单15分钟未付款,取消订单恢复库存.
二、延时队列实现
- 订单创建的时候,订单ID和当前时间戳分别作为Sorted Set的member和score添加到订单队列Sorted Set中
- 通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有订单ID进行处理。
- 同时删除返回指定分数区间内的所有数据,并且由于指定分数区间内的所有数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
- 第二步和第三部必须要保存原子性所有必须在lua脚本中执行
Sorted Set相关命令
-
ZADD命令 - 将一个或多个成员元素及其分数值加入到有序集当中
zadd key score1 value1… scoren valuen -
ZREVRANGEBYSCORE命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
zrevrangebyscore key max min [withscores] [limit offset count] -
ZREM命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略
zrem key member [member …]
RedisDelayQueue
延迟队列接口
/**
* 延迟队列接口
*
* @Author:hjk
* @Date:2021/3/1 14:11
*/
public interface IDelayQueue<T> {
/**
* 入队
*
* @param message
*/
void enqueue(T message);
/**
* 出队
*
* @param min 分数区间 - 最大分数
* @param max 分数区间 - 最小分数
* @param offset offset和limit原理和MySQL的LIMIT offset一致,如果不指定此参数则返回整个集合的数据
* @param limit
* @return
*/
List<T> dequeue(String min, String max, String offset, String limit);
/**
* 出队
*
* @return
*/
List<T> dequeue();
}
延迟队列接口 实现
/**
* 延迟队列接口 实现
*
* @Author:hjk
* @Date:2021/3/1 14:14
*/
@Component
public class OrderDelayQueue implements IDelayQueue<Order>, InitializingBean {
/**
* 分数区间 - 最小分数 默认最小值
*/
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
/**
* 延迟队列名称
*/
private static final String ORDER_QUEUE = "ORDER_DELAY_QUEUE";
private static final String DEQUEUE_LUA = "dequeue.lua";
private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
@Resource
public RedisTemplate<Object, Object> redisTemplate;
@Override
public void enqueue(Order order) {
//60秒后执行
String s = String.valueOf(order.getCreateTime().getTime() + 60 * 1000);
redisTemplate
.opsForZSet()
.add(ORDER_QUEUE, JSON.toJSONString(order), Double.parseDouble(s));
}
@Override
public List<Order> dequeue(String min, String max, String offset, String limit) {
RedisScript<List<String>> redisScript = RedisScript.of(DEQUEUE_LUA_SHA.get(), List.class);
List<Object> keys = Lists.newArrayList();
keys.add(ORDER_QUEUE);
keys.add(min);
keys.add(max);
keys.add(offset);
keys.add(limit);
List<String> list = redisTemplate.execute(redisScript, keys);
List<Order> result = new ArrayList<>();
if (!CollectionUtils.isEmpty(list)) {
for (String order : list) {
if (StringUtils.isNotBlank(order)) {
result.add(JSON.parseObject(order, Order.class));
}
}
}
return result;
}
@Override
public List<Order> dequeue() {
//zset 分数区间 - 最大分数 为当前时间戳
String maxScore = String.valueOf(System.currentTimeMillis());
return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
}
/**
* 实现 InitializingBean 在初始化bean的时候都会执行该方法
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
ClassPathResource resource = new ClassPathResource(DEQUEUE_LUA);
String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
//如果当前值 ==为预期值,则将luaContent设置为给定的更新值
DEQUEUE_LUA_SHA.compareAndSet(null, luaContent);
}
}
lua
local zset_key = KEYS[1]
local min_score = KEYS[2]
local max_score = KEYS[3]
local offset = KEYS[4]
local limit = KEYS[5]
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= nil and #list > 0 then
redis.call('ZREM', zset_key, unpack(list))
return list
else
end
end
end
Consumer
@Component
public class OrderConsumer {
private static final Logger log = LoggerFactory.getLogger(OrderMessageConsumer.class);
@Autowired
private IDelayQueue<Order> iDelayQueue;
public void consumption() {
boolean result = false;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Order> messages = iDelayQueue.dequeue();
if (!CollectionUtils.isEmpty(messages)) {
result = true;
log.info("订单消息处理定时任务开始执行......");
// 集合等分放到线程池中执行
List<List<Order>> partition = Lists.partition(messages, 2);
int size = partition.size();
final CountDownLatch latch = new CountDownLatch(size);
for (List<Order> p : partition) {
async(new OrderConsumer.ConsumeTask(p, latch));
}
try {
latch.await();
} catch (InterruptedException ignore) {
log.error("InterruptedException====>>", ignore);
}
}
if (result) {
stopWatch.stop();
log.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
}
}
@RequiredArgsConstructor
private static class ConsumeTask implements Runnable {
private final List<Order> orders;
private final CountDownLatch latch;
@Override
public void run() {
try {
for (Order order : orders) {
try {
log.info("延迟处理成功!订单信息:{}", order);
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
latch.countDown();
}
}
}
}
三、演示
总结
基于Redis和Scheduled短轮询给出了一个完整的示例。如果需要在生产环境运行还是建议把Scheduled换成Quartz。当前的示例只是处于可运行的状态,代码中的参数需要根据环境来配置。如果队列数据很大则可以根据id取模分片到不同队列。
完
感谢您的阅读
如果你发现了错误的地方,可以在留言区提出来,我对其加以修改。
本文地址:https://blog.csdn.net/qq_39140300/article/details/114261063