欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用Redis实现延时消息队列(Sorted Set)

程序员文章站 2022-06-26 17:05:46
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录一、场景设计二、延时队列实现Sorted Set相关命令三、演示总结一、场景设计1.用户下单15分钟未付款,取消订单恢复库存.二、延时队列实现订单创建的时候,订单ID和当前时间戳分别作为Sorted Set的member和score添加到订单队列Sorted Set中线程通过Sorted Set的命令ZREVRANGEBYSCORE弹出指定数量的订单ID进行处理。项目3Sorted Set相关命令三、演示总结...

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


一、场景设计

1.用户下单15分钟未付款,取消订单恢复库存.

二、延时队列实现

  1. 订单创建的时候,订单ID和当前时间戳分别作为Sorted Set的member和score添加到订单队列Sorted Set中
  2. 通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有订单ID进行处理。
  3. 同时删除返回指定分数区间内的所有数据,并且由于指定分数区间内的所有数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
  4. 第二步和第三部必须要保存原子性所有必须在lua脚本中执行

Sorted Set相关命令

  1. ZADD命令 - 将一个或多个成员元素及其分数值加入到有序集当中
    zadd key score1 value1… scoren valuen

  2. ZREVRANGEBYSCORE命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
    zrevrangebyscore key max min [withscores] [limit offset count]
    使用Redis实现延时消息队列(Sorted Set)

  3. ZREM命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略
    zrem key member [member …]

RedisDelayQueue

延迟队列接口


/**
 * 延迟队列接口
 *
 * @Author:hjk
 * @Date:2021/3/1 14:11
 */
public interface IDelayQueue<T> {

    /**
     * 入队
     *
     * @param message
     */
    void enqueue(T message);

    /**
     * 出队
     *
     * @param min    分数区间 - 最大分数
     * @param max    分数区间 - 最小分数
     * @param offset offset和limit原理和MySQL的LIMIT offset一致,如果不指定此参数则返回整个集合的数据
     * @param limit
     * @return
     */
    List<T> dequeue(String min, String max, String offset, String limit);

    /**
     * 出队
     *
     * @return
     */
    List<T> dequeue();
}

延迟队列接口 实现



/**
 * 延迟队列接口 实现
 *
 * @Author:hjk
 * @Date:2021/3/1 14:14
 */
@Component
public class OrderDelayQueue implements IDelayQueue<Order>, InitializingBean {

    /**
     * 分数区间 - 最小分数 默认最小值
     */
    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /**
     * 延迟队列名称
     */
    private static final String ORDER_QUEUE = "ORDER_DELAY_QUEUE";
    private static final String DEQUEUE_LUA = "dequeue.lua";
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();


    @Resource
    public RedisTemplate<Object, Object> redisTemplate;

    @Override
    public void enqueue(Order order) {
        //60秒后执行
        String s = String.valueOf(order.getCreateTime().getTime() + 60 * 1000);
        redisTemplate
                .opsForZSet()
                .add(ORDER_QUEUE, JSON.toJSONString(order), Double.parseDouble(s));
    }

    @Override
    public List<Order> dequeue(String min, String max, String offset, String limit) {
        RedisScript<List<String>> redisScript = RedisScript.of(DEQUEUE_LUA_SHA.get(), List.class);
        List<Object> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(min);
        keys.add(max);
        keys.add(offset);
        keys.add(limit);
        List<String> list = redisTemplate.execute(redisScript, keys);
        List<Order> result = new ArrayList<>();
        if (!CollectionUtils.isEmpty(list)) {
            for (String order : list) {
                if (StringUtils.isNotBlank(order)) {
                    result.add(JSON.parseObject(order, Order.class));
                }
            }
        }
        return result;
    }

    @Override
    public List<Order> dequeue() {
        //zset 分数区间 - 最大分数 为当前时间戳
        String maxScore = String.valueOf(System.currentTimeMillis());
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
    }

    /**
     * 实现 InitializingBean 在初始化bean的时候都会执行该方法
     *
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ClassPathResource resource = new ClassPathResource(DEQUEUE_LUA);
        String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
        //如果当前值 ==为预期值,则将luaContent设置为给定的更新值
        DEQUEUE_LUA_SHA.compareAndSet(null, luaContent);
    }

}


lua

local zset_key = KEYS[1]
local min_score = KEYS[2]
local max_score = KEYS[3]
local offset = KEYS[4]
local limit = KEYS[5]
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            redis.call('ZREM', zset_key, unpack(list))
            return list
            else
        end
    end
end


Consumer

@Component
public class OrderConsumer {
    private static final Logger log = LoggerFactory.getLogger(OrderMessageConsumer.class);

    @Autowired
    private IDelayQueue<Order> iDelayQueue;

    public void consumption() {
        boolean result = false;
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        List<Order> messages = iDelayQueue.dequeue();
        if (!CollectionUtils.isEmpty(messages)) {
            result = true;
            log.info("订单消息处理定时任务开始执行......");
            // 集合等分放到线程池中执行
            List<List<Order>> partition = Lists.partition(messages, 2);
            int size = partition.size();
            final CountDownLatch latch = new CountDownLatch(size);
            for (List<Order> p : partition) {
                async(new OrderConsumer.ConsumeTask(p, latch));
            }
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                log.error("InterruptedException====>>", ignore);
            }
        }
        if (result) {
            stopWatch.stop();
            log.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
        }
    }


    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final List<Order> orders;
        private final CountDownLatch latch;

        @Override
        public void run() {
            try {
                for (Order order : orders) {
                    try {
                        log.info("延迟处理成功!订单信息:{}", order);
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                latch.countDown();
            }
        }
    }
}

三、演示

使用Redis实现延时消息队列(Sorted Set)

总结

基于Redis和Scheduled短轮询给出了一个完整的示例。如果需要在生产环境运行还是建议把Scheduled换成Quartz。当前的示例只是处于可运行的状态,代码中的参数需要根据环境来配置。如果队列数据很大则可以根据id取模分片到不同队列。

感谢您的阅读

如果你发现了错误的地方,可以在留言区提出来,我对其加以修改。

本文地址:https://blog.csdn.net/qq_39140300/article/details/114261063

相关标签: redis 队列 java