Redis实战 -- 延时队列实现
Redis实现 延迟消息队列
什么是任务队列
在处理Web客户端发送的命令请求时,某些操作的执行时间可能会比我们预期的更长一些。通过将待执行任务的相关信息放入队列里面,并在之后对队列进行处理,这样用户可以推迟执行那些需要一段时间才能完成的操作,这种将工作交给任务处理器来执行的做法称为任务队列。
什么可以实现任务队列
一说到任务队列,目前在工作中使用最多的就是RabbitMQ,但是RabbitMQ使用起来比较复杂,你需要先创建Exchange,再创建Queue,要将Exchange和Queue以某种规则绑定起来,发消息的时候还要指定routing-key,还要控制头部消息。消费者在消费消息之前也要进行上面一系列的繁琐过程。但是绝大多数情况下,虽然我们的消息队列只有一组消费者,但还是需要经历上面这些繁琐的过程。
使用Redis实现异步消息队列
所以本案例主要使用Redis来,针对消费者只有一组的消息队列,使用Redis就可以轻松的实现搞定。 但是Redis实现的不是高级的消息队列,它没有高级特性,没有ack保证,如果对消息有着极致的性能追求,那么它是不适合的。
Redis的 list列表数据结构常用来作为异步消息队列使用,例如实现一个先进先出的队列,我们可以使用lpush/rpop或者rpush/lpop这样对称的命令,push会把数据塞到列表中,pop则会从列表中移除数据,以下为代码示例。
控制台打印过程信息:
这样我们就完成了一个简单的使用redis实现的消息队列功能,需要处理的消息存放到key名为test_queue的列表中存放,同时模拟了消费者一直使用while循环来监听改列表,一旦拿到的数据不为空,就进行业务处理。但是使用redis实现的消息队列不具备Ack机制,可靠性得不到保证,使用的时候一定要注意。
实现延时队列功能
上面的例子我们已经了一个简易的消息队列。我们继续思考一个现实的场景,假定这些是一些游戏商品,它需要添加"延迟销售"特性,在未来某个时候才可以开始处理这些游戏商品数据。 那么要实现这个延迟的特性,我们需要修改现有队列的实现:
- 在消息数据的信息中包含延迟处理消息的执行时间,如果工作进程发现消息的执行时间还没到,那么它将会在短暂的等待之后重新把消息数据推入队列中。(延迟发送消息)
- 使用有序集合来存储这些需要延时消费的消息数据,将任务的执行时间设置为分值,在开启一个工作进程查找有序集合里面是否有可以立刻执行的任务,如果有的话就从有序集合中移除消息并且消费。
*以下代码示例使用方案2实现:
public class RedisDelayQueue {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();
private String queueKey;
public RedisTemplate redisTemplate;
public RedisDelayQueue(RedisTemplate redisTemplate,String queueKey){
this.queueKey = queueKey;
}
public void delay(T msg){
TaskItem<T> item = new TaskItem<T>();
//分配唯一的uuid
item.id = UUID.randomUUID().toString();
item.msg = msg;
//fastjson序列化
String s = JSON.toJSONString(item);
//塞入延时队列,5s后再试
redisTemplate.opsForZSet().add(queueKey,s,System.currentTimeMillis() + 5000);
}
public void loop(){
while(!Thread.interrupted()){
//只取一条
Set<String> values = redisTemplate .opsForZSet().rangeByScore(queueKey,0,System.currentTimeMillis(),0,1);
if(values.isEmpty()){
try {
//歇会继续
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if(redisTemplate.opsForZSet().remove(queueKey,s) > 0){
//多进程同时调用,只有一个会remove成功
TaskItem<T> task = JSON.parseObject(s, TaskType);
//执行业务逻辑
handleTask(task.msg);
}
}
}
private void handleTask(T msg) {
System.out.println(msg);
}
}
测试用例:
输出结果:为了保证效果,我的消息设置了50s,这样就可以比较显示的看到效果。
总结
以上我们了解了使用Redis的列表来实现简单的FIFO队列实现;还是用ZSet来实现一个延时队列。我们可以拿RabbitMQ的延时队列实现做队列,可以发现Redis实现的延时队列更为简单。
此外,对于上述示例的优化,在该任务被多个进程同时去争抢时,没有抢到执行zrem命令的线程就是白白执行了一次取ZSet数据的任务,造成了浪费,可以考虑使用lua scripting进行优化。