消息队列之 RabbitMQ
关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。
市面上的消息队列产品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年底阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我自己的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此之前先看下消息队列的相关概念。
什么叫消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
为何用消息队列
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
RabbitMQ 特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ 中的概念模型
消息模型
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
RabbitMQ 基本概念
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体。
AMQP 中的消息路由
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
1、direct
direct 交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
2、fanout
fanout 交换器
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
3、topic
topic 交换器
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
消息的存储
RabbitMQ的消息默认是在内存里的,实际上不光是消息,Exchange路由等信息实际都在内存中。内存的优点是高性能,问题在于故障后无法恢复。所以RabbitMQ也支持持久化的存储,也就是写磁盘。
要在RabbitMQ中持久化消息,要同时满足三个条件:
- 消息投体时使用持久化投递模式
- 目标交换器是配置为持久化的
- 目标队列是配置为持久化的
RabbitMQ持久化消息的方式是常见的写日志方式:
当一条持久化消息发送到持久化的Exchange上时,RabbitMQ会在消息提交到日志文件后,才发送响应
一旦这条消息被消费后,RabbitMQ会将会把日志中该条消息标记为等待垃圾收集,之后会从日志中清除
如果出现故障,自动重建Exchange,Bindings和Queue,同时通过重播持久化日志来恢复消息。
消息持久化的优缺点很明显,拥有故障恢复能力的同时,也带来了性能的急剧下降。同时,由于RabbitMQ默认情况下是没有冗余的,假设一个持久化节点崩溃,一致到该节点恢复前,消息和队列都无法恢复。
2、生产者消息确认机制
发后即忘
RabbitMQ默认发布消息是不会返回任何结果给生产者的,所以存在发送过程中丢失数据的风险
AMQP事务
AMQP事务保证RabbitMQ不仅收到了消息,并成功将消息路由到了所有匹配的订阅队列,AMQP事务将使得生产者和RabbitMQ产生同步。
虽然事务使得生产者可以确定消息已经到达RabbitMQ中的对应队列,但是却会降低2~10倍的消息吞吐量。
发送方确认
开启发送方确认模式后,消息会有一个唯一的ID,一旦消息被投递给所有匹配的队列后,会回调给发送方应用程序(包含消息的唯一ID),使得生产者知道消息已经安全到达队列了。
如果消息和队列是配置成了持久化,这个确认消息只会在队列将消息写入磁盘后才会返回。如果RabbitMQ内部发生了错误导致这条消息丢失,那么RabbitMQ会发送一条nack消息,当然我理解这个是不能保证的。
这种模式由于不存在事务回滚,同时整体仍然是一个异步过程,所以更加轻量级,对服务器性能的影响很小。
3、消费者消息确认机制
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。
4、死信队列
DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。 当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,rabbitmq会自动发送)。
什么是死信呢?什么样的消息会变成死信呢?
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
应用场景分析: 在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了 **如何使用死信交换机呢?
定义业务(普通)队列的时候指定参数:
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
@Bean
public Queue helloQueue() {
//将普通队列绑定到私信交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(queueName, true, false, false, args);
return queue;
}
RabbitMQ集群
RabbitMQ集群的设计目标:
允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行
能过通过添加节点来线性扩展消息通信吞吐量
从实际结果看,RabbitMQ完成设计目标上并不十分出色,主要原因在于默认的模式下,RabbitMQ的队列实例子只存在在一个节点上(虽然后续也支持了镜像队列),既不能保证该节点崩溃的情况下队列还可以继续运行,也不能线性扩展该队列的吞吐量。
集群结构
RabbitMQ内部的元数据主要有:
- 队列元数据-队列名称和属性
- 交换器元数据-交换器名称,类型和属性
- 绑定元数据-路由信息
集群中的队列
这里有个问题需要思考,RabbitMQ 默认会将消息冗余到所有节点上吗?这样听起来正符合高可用的特性,只要集群上还有一个节点存活,那么就可以继续进行消息通信,但这也随之为 RabbitMQ 带来了致命的缺点:
每次发布消息,都要把它扩散到所有节点上,而且对于磁盘节点来说,每一条消息都会触发磁盘活动,这会导致整个集群内性能负载急剧拉升。
如果每个节点都有所有队列的完整内容,那么添加节点不会给你带来额外的存储空间,也会带来木桶效应,举个例子,如果集群内有个节点存储了 3G 队列内容,那么在另外一个只有 1G 存储空间的节点上,就会造成内存空间不足的情况,也就是无法通过集群节点的扩容提高消息积压能力。
解决这个问题就是通过集群中唯一节点来负责任何特定队列,只有该节点才会受队列大小的影响,其它节点如果接收到该队列消息,那么就要根据元数据信息,传递给队列所有者节点(也就是说其它节点上只存储了特定队列所有者节点的指针)。这样一来,就可以通过在集群内增加节点,存储更多的队列数据。
虽然RabbitMQ的队列实际只会在一个节点上,但元数据可以存在各个节点上。举个例子来说,当创建一个新的交换器时,RabbitMQ会把该信息同步到所有节点上,这个时候客户端不管连接的那个RabbitMQ节点,都可以访问到这个新的交换器,也就能找到交换器下的队列。
如上图所示,队列A的实例实际只在一个RabbitMQ节点上,其它节点实际存储的是只想该队列的指针。
为什么RabbitMQ不在各个节点间做复制了,《RabbitMQ实战》给出了两个原因:
存储成本-RabbitMQ作为内存队列,复制对存储空间的影响,毕竟内存是昂贵而有限的
性能损耗-发布消息需要将消息复制到所有节点,特别是对于持久化队列而言,性能的影响会很大
我理解成本这个原因并不完全成立,复制并不一定要复制到所有节点,比如一个队列可以只做两个副本,复制带来的内存成本可以交给使用方来评估,毕竟在内存中没有堆积的情况下,实际上队列是不会占用多大内存的。
还有一点是RabbitMQ本身并没有保证消息消费的有序性,所以实际上队列被Partition到各个节点上,这样才能真正达到线性扩容的目的(以RabbitMQ的现状来说,单队列实际是无法扩容的,只有在业务层做切分)。
场景1、客户端直接连接队列所在节点
如果有一个消息生产者或者消息消费者通过amqp-client的客户端连接至节点1进行消息的发布或者订阅,那么此时的集群中的消息收发只与节点1相关,这个没有任何问题;如果客户端相连的是节点2或者节点3(队列1数据不在该节点上),那么情况又会是怎么样呢?
场景2、客户端连接的是非队列数据所在节点
如果消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据(也就是上文提到的:指向queue的owner node的指针)转发至节点1上,最终发送的消息还是会存储至节点1的队列1上。
同样,如果消息消费者所连接的节点2或者节点3,那这两个节点也会作为路由节点起到转发作用,将会从节点1的队列1中拉取消息进行消费。
注:RabbitMQ集群中的节点可以是内存节点也可以是磁盘节点,但要求至少有一个磁盘节点,这样出现故障时才能恢复数据。
补充说明:内存节点与磁盘节点
关于上面队列所说的问题与解决办法(即只保存交换元数据,而队列信息只保存在单个节点上),又有了一个伴随而来的问题出现:如果特定队列的所有者节点发生了故障,那么该节点上的队列和关联的绑定都会消失吗?
如果是内存节点,那么附加在该节点上的队列和其关联的绑定都会丢失,并且消费者可以重新连接集群并重新创建队列;
如果是磁盘节点,重新恢复故障后,该队列又可以进行传输数据了,并且在恢复故障磁盘节点之前,不能在其它节点上让消费者重新连到集群并重新创建队列,如果消费者继续在其它节点上声明该队列,会得到一个 404 NOT_FOUND 错误,这样确保了当故障节点恢复后加入集群,该节点上的队列消息不回丢失,也避免了队列会在一个节点以上出现冗余的问题。
接下来说说内存节点与磁盘节点在集群中的作用,在集群中的每个节点,要么是内存节点,要么是磁盘节点,如果是内存节点,会将所有的元数据信息仅存储到内存中,而磁盘节点则不仅会将所有元数据存储到内存上, 还会将其持久化到磁盘。
在单节点 RabbitMQ 上,仅允许该节点是磁盘节点,这样确保了节点发生故障或重启节点之后,所有关于系统的配置与元数据信息都会重磁盘上恢复;而在 RabbitMQ 集群上,允许节点上至少有一个磁盘节点,在内存节点上,意味着队列和交换器声明之类的操作会更加快速。原因是这些操作会将其元数据同步到所有节点上,对于内存节点,将需要同步的元数据写进内存就行了,但对于磁盘节点,意味着还需要及其消耗性能的磁盘写入操作。
RabbitMQ 集群只要求至少有一个磁盘节点,这是有道理的,当其它内存节点发生故障或离开集群,只需要通知至少一个磁盘节点进行元数据的更新,如果是碰巧唯一的磁盘节点也发生故障了,集群可以继续路由消息,但是不可以做以下操作了:
- 创建队列
- 创建交换器
- 创建绑定
- 添加用户
- 更改权限
- 添加或删除集群节点
这是因为上述操作都需要持久化到磁盘节点上,以便内存节点恢复故障可以从磁盘节点上恢复元数据,解决办法是在集群添加 2 台以上的磁盘节点,这样其中一台发生故障了,集群仍然可以保持运行,且能够在任何时候保存元数据变更。
镜像队列
镜像队列架构
RabbitMQ自己也考虑到了我们之前分析的单节点长时间故障无法恢复的问题,所以RabbitMQ 2.6.0之后它也支持了镜像队列,换个说法也就是副本。
除了发送消息,所有的操作实际都在主拷贝上,从拷贝实际只是个冷备(默认的情况下所有RabbitMQ节点上都会有镜像队列的拷贝),如果使用消息确认模式,RabbitMQ会在主拷贝和从拷贝都安全的接受到消息时才通知生产者。
从这个结构上来看,如果从拷贝的节点挂了,实际没有任何影响,如果主拷贝挂了,那么会有一个从新选主的过程,这也是镜像队列的优点,除非所有节点都挂了,才会导致消息丢失。重新选主后,RabbitMQ会给消费者一个消费者取消通知(Consumer Cancellation),让消费者重连新的主拷贝。
镜像队列原理
1.RabbitMQ结构
AMQPQueue:负责AMQP协议相关的消息处理,包括接收消息,投递消息,Confirm消息等
BackingQueue:提供AMQQueue调用的接口,完成消息的存储和持久化工作
BackingQueue由Q1,Q2,Delta,Q3,Q4五个子队列构成,在Backing中,消息的生命周期有四个状态:
Alpha:消息的内容和消息索引都在RAM中。(Q1,Q4)
Beta:消息的内容保存在Disk上,消息索引保存在RAM中。(Q2,Q3)
Gamma:消息的内容保存在Disk上,消息索引在DISK和RAM上都有。(Q2,Q3)
Delta:消息内容和索引都在Disk上。(Delta)
这里以持久化消息为例(可以看到非持久化消息的生命周期会简单很多),从Q1到Q4,消息实际经历了一个RAM->DISK->RAM这样的过程,BackingQueue这么设计的目的有点类似于Linux的Swap,当队列负载很高时,通过将部分消息放到磁盘上来节省内存空间,当负载降低时,消息又从磁盘回到内存中,让整个队列有很好的弹性。因此触发消息流动的主要因素是:1.消息被消费;2.内存不足。
RabbitMQ会更具消息的传输速度来计算当前内存中允许保存的最大消息数量(Traget_RAM_Count),当:内存中保存的消息数量+等待ACK的消息数量>Target_RAM_Count 时,RabbitMQ才会把消息写到磁盘上,所以说虽然理论上消息会按照Q1->Q2->Delta->Q3->Q4的顺序流动,但是并不是每条消息都会经历所有的子队列以及对应的生命周期。
从RabbitMQ的Backing Queue结构来看,当内部不足时,消息要经历多个生命周期,在Disk和RAM之间置换,者实际会降低RabbitMQ的处理性能(后续的流控就是关联的解决方法)。
2.镜像队列结构
所有对镜像队列主拷贝的操作,都会通过Guarented Multicasting(GM)同步到各个Salve节点,Coodinator负责组播结果的确认。
GM是一种可靠的组播通信协议,保证组组内的存活节点都收到消息。
GM的主播并不是由Master节点来负责通知所有Slave的(目的是为了避免Master压力过大,同时避免Master失效导致消息无法最终Ack),RabbitMQ把一个镜像队列的所有节点组成一个链表,由主拷贝发起,由主拷贝最终确认通知到了所有的Slave,而中间由Slave接力的方式进行消息传播。
从这个结构来看,消息完成整个镜像队列的同步耗时理论上是不低的,但是由于RabbitMQ消息的消息确认本身是异步的模式,所以整体的吞吐量并不会受到太大影响。
流控
当RabbitMQ出现内存(默认是0.4)或者磁盘资源达到阈值时,会触发流控机制,阻塞Producer的Connection,让生产者不能继续发送消息,直到内存或者磁盘资源得到释放。
RabbitMQ基于Erlang/OTP开发,一个消息的生命周期中,会涉及多个进程间的转发,这些Erlang进程之间不共享内存,每个进程都有自己独立的内存空间,如果没有合适的流控机制,可能会导致某个进程占用内存过大,导致OOM。因此,要保证各个进程占用的内容在一个合理的范围,RabbitMQ的流控采用了一种信用证机制(Credit),为每个进程维护了四类键值对:
{credit_from,From}-该值表示还能向消息接收进程From发送多少条消息
{credit_to,To}-表示当前进程再接收多少条消息,就要向消息发送进程增加Credit数量
credit_blocked-表示当前进程被哪些进程block了,比如进程A向B发送消息,那么当A的进程字典中{credit_from,B}的值为0是,那么A的credit_blocked值为[B]
credit_deferred-消息接收进程向消息发送进程增加Credit的消息列表,当进程被Block时会记录消息信息,Unblock后依次发送这些消息
如图所示,A进程当前可以发送给B的消息有100条,每发一次,值减1,直到为0,A才会被Block住。B消费消息后,会给A增加新的Credit,这样A才可以持续的发送消息。这里只画了两个进程,多进程串联的情况下,这中影响也就是从底向上传递的。
RabbitMQ的集群架构设计图
对于消息的生产和消费者可以通过HAProxy的软负载将请求分发至RabbitMQ集群中的Node1~Node7节点,其中Node8~Node10的三个节点作为磁盘节点保存集群元数据和配置信息。为了能够在实际的生产环境使用还需要根据实际的业务需求对集群中的各个实例进行一些性能参数指标的监控,从性能、吞吐量和消息堆积能力等角度考虑,可以选择Kafka来作为RabbitMQ集群的监控队列使用。
参考文档:
1、认识RabbitMQ从这篇文章开始 https://zhuanlan.zhihu.com/p/61855491
2、史上最透彻的 RabbitMQ 可靠消息传输实战 https://juejin.im/entry/5bbc22135188255c6a0456ef
3、RabbitMQ集群原理与部署 http://objcoding.com/2018/10/19/rabbitmq-cluster/
4、消息中间件—RabbitMQ(集群原理与搭建篇) https://www.jianshu.com/p/6376936845ff
5、消息队列之 RabbitMQ https://www.jianshu.com/p/79ca08116d57