【RocketMQ】原理分析:Consumer消费模型、消费规则
1.RocketMQ消费模型
RocketMQ提供了两种消息消费模型,一种是pull主动拉取,另一种是push被动接收。
- 实际上 RocketMQ都是pull模式,而push只是在pull模式上做了一层封装
- RocketMQ是基于长轮训来实现消息的pull
- pull到消息以后触发业务消费者注册的call back
2.消费规则
我们在【RocketMQ】原理分析:Broker集群模式、队列分区 介绍了RocketMQ的多住多从集群模式和通过多个消息队列实现分区。所以如果有一个consumer来了,他到底该在哪个小集群去消费topic下的哪个消息队列?
2.1 master角度
首先,站在master角度看看Consumer是如何分配拉取消息的。假设有两个master节点的集群,分别是A,B一个消费者组,组中有2个consumer。
因为master集群数:group中消费者数 = 1:1,所以两个master节点恰好可以平均分发到两个消费者上(比如consumer1消费brokerA的消息,consumer2消费brokerB的消息)
另外,如果此时只有一个Consumer,那么它会消费两个master节点的数据;如果此时有3个Consumer,那么会有一个Consumer无法消费到消息
2.2 topic角度
接下来,站在topic角度看看Consumer是如何分配拉取消息的。假设有两个master节点的集群,创建了一个TestTopic,并且对这个topic创建了两个队列,也就是分区。
一个消费者可以同时消费多个master节点上的消息,而Topic中的队列数 :group中的消费者数 = 1:1,所以无论是在哪个group都是一个consumer消费Broker集群中一个队列(比如consumer1消费MQ0,consumer2消费MQ1)
注:消费者定义了两个分组,分组的概念也是和kafka一样,通过分组可以实现消息的广播。每个group中有2个Consumer
另外,如果此时只有一个Consumer,那么它会消费所有分区;如果此时有3个分区,那么就有一个Consumer消费2个分区。
2.3 动态平衡
数量关系确定了,那么具体的对应关系是什么呢?比如consumer1到底是消费MQ0还是MQ2。
和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。消费端会通过Rebalance Service线程,10秒钟做一次基于topic下的所有队列负载消费端遍历自己的所有topic,
- 依次调rebalanceByTopic 根据topic获取此topic下的所有queue
- 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)
- 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法(RocketMQ提供了6中分区的分配算法)
- AllocateMessageQueueAveragely:平均分配算法(默认)
- AllocateMessageQueueAveragelyByCircle:环状分配消息队列
- AllocateMessageQueueByConfig:按照配置来分配队列: 根据用户指定的配置来进行负载
- AllocateMessageQueueByMachineRoom:按照指定机房来配置队列
- AllocateMachineRoomNearby:按照就近机房来配置队列:
- AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行
什么时候触发负载均衡?消费者启动之后消费者数量发生变更 每10秒会触发检查一次rebalance
3.消息的顺序消费
通过分区规则可以实现同类消息在rocketmq上的顺序存储。但是对于消费端来说,如何保证消费的顺序?
前提:生产者 - message queue - 消费者之间是一对一对一的关系。
- 首先,需要保证顺序的消息要发送到同一个message queue中;
- 其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;
- 最后,一个消费者内部对一个mq的消费要保证是有序的。
在RocketMQ的api中对每个Consumer提供了两种消费方式:
-
MessageListenerConcurrently
:并发监听,也就是基于多个线程并行来消费消息。这个无法保证消息消费的顺序 -
MessageListenerOrderly
:顺序消费,单线程去消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
consumer.subscribe("store_topic_test","*");
consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
list.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeOrderlyStatus.SUCCESS;
});
顺序消费会带来一些问题,
- 遇到消息失败的消息,无法跳过,当前队列消费暂停
- 降低了消息处理的性能