欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis实战 -- 延时队列实现

程序员文章站 2022-05-22 20:35:04
...

Redis实现 延迟消息队列

什么是任务队列

在处理Web客户端发送的命令请求时,某些操作的执行时间可能会比我们预期的更长一些。通过将待执行任务的相关信息放入队列里面,并在之后对队列进行处理,这样用户可以推迟执行那些需要一段时间才能完成的操作,这种将工作交给任务处理器来执行的做法称为任务队列

什么可以实现任务队列

一说到任务队列,目前在工作中使用最多的就是RabbitMQ,但是RabbitMQ使用起来比较复杂,你需要先创建Exchange,再创建Queue,要将Exchange和Queue以某种规则绑定起来,发消息的时候还要指定routing-key,还要控制头部消息。消费者在消费消息之前也要进行上面一系列的繁琐过程。但是绝大多数情况下,虽然我们的消息队列只有一组消费者,但还是需要经历上面这些繁琐的过程。

使用Redis实现异步消息队列

所以本案例主要使用Redis来,针对消费者只有一组的消息队列,使用Redis就可以轻松的实现搞定。 但是Redis实现的不是高级的消息队列,它没有高级特性,没有ack保证,如果对消息有着极致的性能追求,那么它是不适合的。

Redis的 list列表数据结构常用来作为异步消息队列使用,例如实现一个先进先出的队列,我们可以使用lpush/rpop或者rpush/lpop这样对称的命令,push会把数据塞到列表中,pop则会从列表中移除数据,以下为代码示例。
Redis实战 -- 延时队列实现
控制台打印过程信息:
Redis实战 -- 延时队列实现
这样我们就完成了一个简单的使用redis实现的消息队列功能,需要处理的消息存放到key名为test_queue的列表中存放,同时模拟了消费者一直使用while循环来监听改列表,一旦拿到的数据不为空,就进行业务处理。但是使用redis实现的消息队列不具备Ack机制,可靠性得不到保证,使用的时候一定要注意。

实现延时队列功能

上面的例子我们已经了一个简易的消息队列。我们继续思考一个现实的场景,假定这些是一些游戏商品,它需要添加"延迟销售"特性,在未来某个时候才可以开始处理这些游戏商品数据。 那么要实现这个延迟的特性,我们需要修改现有队列的实现:

  1. 在消息数据的信息中包含延迟处理消息的执行时间,如果工作进程发现消息的执行时间还没到,那么它将会在短暂的等待之后重新把消息数据推入队列中。(延迟发送消息)
  2. 使用有序集合来存储这些需要延时消费的消息数据,将任务的执行时间设置为分值,在开启一个工作进程查找有序集合里面是否有可以立刻执行的任务,如果有的话就从有序集合中移除消息并且消费。

*以下代码示例使用方案2实现:
public class RedisDelayQueue {

static class TaskItem<T> {

    public String id;
    public T msg;
}

// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();

private String queueKey;
public RedisTemplate redisTemplate;

public RedisDelayQueue(RedisTemplate redisTemplate,String queueKey){
    this.queueKey = queueKey;
}

public void delay(T msg){

    TaskItem<T> item = new TaskItem<T>();
    //分配唯一的uuid
    item.id = UUID.randomUUID().toString();
    item.msg = msg;

    //fastjson序列化
    String s = JSON.toJSONString(item);

    //塞入延时队列,5s后再试
    redisTemplate.opsForZSet().add(queueKey,s,System.currentTimeMillis() + 5000);

}

public void loop(){

    while(!Thread.interrupted()){
        //只取一条
        Set<String> values = redisTemplate  .opsForZSet().rangeByScore(queueKey,0,System.currentTimeMillis(),0,1);
        if(values.isEmpty()){

            try {
                //歇会继续
                Thread.sleep(500);
            } catch (InterruptedException e) {
                break;
            }

            continue;
        }

        String s = values.iterator().next();
        if(redisTemplate.opsForZSet().remove(queueKey,s) > 0){
            //多进程同时调用,只有一个会remove成功
            TaskItem<T> task = JSON.parseObject(s, TaskType);
            //执行业务逻辑
            handleTask(task.msg);
        }

    }
}

private void handleTask(T msg) {

    System.out.println(msg);
}

}

测试用例:
Redis实战 -- 延时队列实现
输出结果:为了保证效果,我的消息设置了50s,这样就可以比较显示的看到效果。
Redis实战 -- 延时队列实现

总结

以上我们了解了使用Redis的列表来实现简单的FIFO队列实现;还是用ZSet来实现一个延时队列。我们可以拿RabbitMQ的延时队列实现做队列,可以发现Redis实现的延时队列更为简单。

此外,对于上述示例的优化,在该任务被多个进程同时去争抢时,没有抢到执行zrem命令的线程就是白白执行了一次取ZSet数据的任务,造成了浪费,可以考虑使用lua scripting进行优化。