基于Redis实现阻塞队列
日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理。
这个队列,可以是专业的消息队列,如 rocketmq/rabbitmq 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味。
也可以使用基于 jvm 内存实现队列,但是如果项目进行了重启,就会造成队列数据丢失。
大部分的项目都会用到 redis 中间件作为缓存使用,此时使用 redis 的 list 结构来实现队列则是非常合适的选择。
因此,本文主要讲解基于 redis 的方式实现异步队列。
本文首发个人技术博客: https://nullpointer.pw/redis-block-queue.html
基于 redis 的 list 实现队列的方式也有多种,先说第一种不推荐的方式,即使用lpush
生产消息,然后 while(true) 中通过rpop
消费消息,这种方式的确可以实现,但是不断代码不断的轮询,势必会消耗一些系统的资源。
第二种方式也是不推荐的方式,也是通过 lpush
生产消息,然后通过 brpop
进行阻塞地等待并消费消息,这种方式较第一种方式减少了无用的轮询,降低系统资源的消耗,但是可能会存在队列消息丢失的情况,如果取出了消息然后处理失败,这个被取出的消息就将丢失。
第二种方式就是下文要介绍的方式,首先也是通过 lpush
生产消息,然后通过 brpoplpush
阻塞地等待 list 新消息到来,有了新消息才开始消费,同时将消息备份到另外一个 list 当中,这种方式具备了第二种方式的优点,即减少了无用的轮询,同时也对消息进行了备份不会丢失数据,如果处理成功,可以通过 lrem
对备份的 list 中当前的这条消息进行删除处理。这种方式实现方式可以参考 模式: 安全的队列 .
redis 基础
# 将一个或多个值 value 插入到列表 key 的表头 lpush key value [value …] # 阻塞式等待,将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长 (block indefinitely) 。 brpoplpush source destination timeout # 根据参数 count 的值,移除列表中与参数 value 相等的元素。 lrem key count value
代码实现队列消息生产者
笔者使用的是 spring 相关 api 实现对 redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。
@resource private redisconnectionfactory connectionfactory; public void lpush(@nonnull string key, @nonnull string value) { redisconnection connection = redisconnectionutils.getconnection(connectionfactory); try { byte[] bytekey = redisserializer.string().serialize(getkey(key)); byte[] bytevalue = redisserializer.string().serialize(value); assert bytekey != null; connection.lpush(bytekey, bytevalue); } finally { redisconnectionutils.releaseconnection(connection, connectionfactory); } }
代码实现队列消息消费者
因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 java8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。
有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 springboot 一般会指定 timeout 的全局超时时间,即使 brpoplpush
设置了 0,即无限期,当超出了 timeout 设置的值时,就会抛出 querytimeoutexception 异常导致线程退出,因此添加了 try/catch 对异常进行捕获并忽略,同时使用 while(true) 保证线程可以继续执行。
代码中记录了当前消息处理结果,如果处理结果为成功,需要对备份队列的当前消息进行删除。
public void brpoplpush(@nonnull string key, consumer<string> consumer) { completablefuture.runasync(() -> { redisconnection connection = redisconnectionutils.getconnection(connectionfactory); try { byte[] srckey = redisserializer.string().serialize(getkey(key)); byte[] dstkey = redisserializer.string().serialize(getbackupkey(key)); assert srckey != null; assert dstkey != null; while (true) { byte[] bytevalue = new byte[0]; boolean success = false; try { bytevalue = connection.brpoplpush(0, srckey, dstkey); if (bytevalue != null && bytevalue.length != 0) { consumer.accept(new string(bytevalue)); success = true; } } catch (exception ignored) { // 防止获取 key 达到超时时间抛出 querytimeoutexception 异常退出 } finally { if (success) { // 处理成功才删除备份队列的 key connection.lrem(dstkey, 1, bytevalue); } } } } finally { redisconnectionutils.releaseconnection(connection, connectionfactory); } }); }
测试代码
@test public void testlpush() throws interruptedexception { string queuea = "queuea"; int i = 0; while (true) { string msg = "hello-" + i++; redisblockqueue.lpush(queuea, msg); system.out.println("lpush: " + msg); thread.sleep(3000); } } @test public void testbrpoplpush() { string queuea = "queuea"; redisblockqueue.brpoplpush(queuea, (val) -> { // 在这里处理具体的业务逻辑 system.out.println("val: " + val); }); // 防止 junit 进程退出 locksupport.park(); }
项目使用方式
为了方便使用,我将其抽取为了一个工具类,使用时通过 spring 注入使用即可,
队列消费可以使用如下方式在项目启动的时候就进行阻塞监听队列,等待消费
@resource private redisblockqueue redisblockqueue; @postconstruct public void init() { redisblockqueue.brpoplpush(xx, (value) -> { //... }); }
本文完整代码下载github 地址
到此这篇关于基于redis实现阻塞队列的文章就介绍到这了,更多相关redis阻塞队列内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
上一篇: Oracle手动建库安装部署超详细教程