Redis5新特性Streams作消息队列
前言
redis 5 新特性中,streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 streams 数据结构中常用 api 进行分析。
准备
本文所使用 redis 版本为 5.0.5 。如果使用更早的 5.x 版本,有些 api 使用效果,与本文中描述略有不同。
添加消息
streams 添加数据使用 xadd 指令进行添加,消息中的数据以 k-v 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:
xadd key id field string [field string ...]
其中 key 为 streams 的名称,id 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 person 为名称的流,进行操作。
xadd person * name ytao des https://ytao.top
上面添加案例中,id 使用 * 号复制,这里代表着服务端自动生成 id,添加后返回数据 "1578238486193-0"
这里自动生成的 id 格式为 <millisecondstime>-<sequencenumber>
id 是由两部分组成:
- millisecondstime 为当前服务器时间毫秒时间戳。
- sequencenumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。
比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。
除了服务端自动生成 id 方式外,也支持指定 id 的生成,但是指定 id 有以下条件限制:
- id 中的前后部分必须为数字。
- 最小 id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。
- 添加的消息,id 的前半部分不能比存在 id 最大的值小,id 后半部分不能比存在前半部分相同的最大后半部分小。
否则,当不满足上述条件时,添加后会抛出异常:
(error) err the id specified in xadd is equal or smaller than the target stream top item
实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 streams,则创建 streams 的名称,再添加消息到 streams 中。即使添加消息时,由于 id 异常,也可以在 redis 中存在以当前 streams 的名称。
streams 中 id 也可作为指针使用,因为它是一个有序的标记。
生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 streams 的设计初期也有考虑到这个问题,那就是可以指定 streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 maxlen
参数。
xadd person maxlen 5 * name ytao des https://ytao.top
这样就指定该了 streams 中的容量为 5 条消息。也可使用 xtrim 截取消息,从小到大剔除多余的消息:
xtrim person maxlen 8
消息数量
查看消息数量使用 xlen 指令进行操作。
xlen key
例:查看 person 流中的消息数量:
> xlen person (integer) 5
查询消息
查询 streams 中的消息使用 xrange 和 xrevrange 指令。
xrange
查询数据时,可以按照指定 id 范围进行查询,xrange 查询指令格式:
xrange key start end [count count]
参数说明:
- key 为 streams 的名称
- start 为范围查询开始 id,包含本 id。
- start 为范围查询结束 id,包含本 id。
- count 为查询返回最大的消息数量,非必填。
这里 start 和 end 有-
和+
两个非指定值,他们分别表示无穷小和无穷大,所以当使用这个两个值时,会查询出全部的消息。
> xrange person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!"
上面查询的消息数据,可以看到是按照先进先出的顺序查询出来的。
使用 count 指定查询返回的数量:
# 查询所有的消息,并且返回一条数据 > xrange person - + count 1 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
在范围查询中,id 的后半部分可省略,后半部分中的数据会全部查询到。
xrevrange
xrevrange 的查询和 xrange 指令中的使用类似,但查询的 start 和 end 参数顺序进行了调换:
xrevrange key end start [count count]
使用案例:
> xrevrange person + - 1) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
查询后的结果与 xrange 的结果顺序刚好相反,其他都一样,这两个指令可进行消息的升序和降序的返回。
删除消息
删除消息使用 xdel 指令操作,只需指定将要删除的 streams 名称和 id 即可,支持一次删除多个消息 。
xdel key id [id ...]
删除案例:
# 查询所有消息 > xrange person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" # 删除消息 > xdel person 2-0 (integer) 1 # 再次查询删除后的所有消息 > xrange person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" # 查询删除后的长度 > xlen person (integer) 2
从上面可以看到,删除消息后,长度也会减少相应的数量。
消费消息
在 redis 的 pub/sub 中,我们是通过订阅来消费消息,在 streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。
单独消费
单独消费使用 xread 指令。可以看到,下面命令中,streams,key, 以及 id 为必填项。id 表示将要读取大于该 id 的消息。当 id 值使用 $
赋予时,表示已存在消息的最大 id 值。
xread [count count] [block milliseconds] streams key [key ...] id [id ...]
上面的 count
参数用来指定读取的最大数量,与 xrange 的用法一样。
> xread count 1 streams person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" > xread count 2 streams person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!"
在 xread 里面还有个 block
参数,这个是用来阻塞订阅消息的,block
携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。
# 窗口 1 开启阻塞,等待新消息的到来 > xread block 0 streams person $ # 另开一个连接窗口 2,添加一条新的消息 > xadd person 2-2 name tao des coder "2-2" # 窗口 1,获取到有新的消息来消费,并且带有阻塞的时间 > xread block 0 streams person $ 1) 1) "person" 2) 1) 1) "2-2" 2) 1) "name" 2) "tao" 3) "des" 4) "coder" (60.81s)
当使用 xread 进行顺序消费时,需要额外记录下读取到位置的 id,方便下次继续消费。
群组消费
群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:创建群组,群组读取消息,向服务端确认消息以处理。
群组操作
操作群组使用 xgroup 指令:
xgroup [create key groupname id-or-$] [setid key id-or-$] [destroy key groupname] [delconsumer key groupname consumername]
上面命令中,包含操作有:
- create 创建消费组。
- setid 修改下一个处理消息的 id。
- destroy 销毁消费组。
- delconsumer 删除消费组中指定的消费者。
我们当前需要使用的是创建消费组:
# 以当前存在的最大 id 作为消费起始 > xgroup create person group1 $ ok
群组读取消息
群组读取使用 xreadgroup 指令,count
和block
的使用类似 xread 的操作,只是多了个群组和消费者的指定:
xreadgroup group group consumer [count count] [block milliseconds] streams key [key ...] id [id ...]
由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 id 也有个特殊值>
,表示还未进行消费的消息:
# 窗口 1,消费群组中,taotao 消费者建立阻塞监听 xreadgroup group group1 taotao block 0 streams person > # 窗口 2,消费群组中,yangyang 消费者建立阻塞监听 xreadgroup group group1 yangyang block 0 streams person > # 窗口 3,添加消费消息 > xadd person 3-1 name tony des 666 "3-1" # 窗口 1,读取到新消息,此时 窗口 2 没有任何反应 > xreadgroup group group1 taotao block 0 streams person > 1) 1) "person" 2) 1) 1) "3-1" 2) 1) "name" 2) "tony" 3) "des" 4) "666" (77.54s) # 窗口 3,再次添加消费消息 > xadd person 3-2 name james des abc! "3-2" # 窗口 2,读取到新消息,此时 窗口 1 没有任何反应 > xreadgroup group group1 yangyang block 0 streams person > 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!" (76.36s)
以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。
消息ack
消息消费后,为避免再次重复消费,这是需要向服务端发送 ack,确保消息被消费后的标记。
例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:
> xreadgroup group group1 yangyang streams person 0 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!"
这时,我们使用 xack 指令告诉服务器,我们已处理的消息:
xack key group id [id ...]0
让服务器标记 3-2 已处理:
> xack person group1 3-2 (integer) 1
再次获取群组读取消息:
> xreadgroup group group1 yangyang streams person 0 1) 1) "person" 2) (empty list or set)
队列中没有了可读消息。
除了上面以讲解到的 api 外,查看消费群组信息可使用 xinfo 指令查看,本文不做分析。
总结
上面对 streams 常用 api 进行了分析,我们可以感受到 redis 在消息队列支持的道路上,也越来越强大。如果使用过它的 pub/sub 功能的话,就会感受到 5.x 迭代正是将你的一些痛点进行了优化。
个人博客:
关注公众号 【ytao】,更多原创好文
上一篇: SSM整合
下一篇: Linux安装和卸载MySQL5.7