欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【RocketMQ】原理分析:Consumer消费模型、消费规则

程序员文章站 2022-03-23 12:55:55
...

1.RocketMQ消费模型

RocketMQ提供了两种消息消费模型,一种是pull主动拉取,另一种是push被动接收。

  • 实际上 RocketMQ都是pull模式,而push只是在pull模式上做了一层封装
  • RocketMQ是基于长轮训来实现消息的pull
  • pull到消息以后触发业务消费者注册的call back

2.消费规则

我们在【RocketMQ】原理分析:Broker集群模式、队列分区 介绍了RocketMQ的多住多从集群模式和通过多个消息队列实现分区。所以如果有一个consumer来了,他到底该在哪个小集群去消费topic下的哪个消息队列?

2.1 master角度

首先,站在master角度看看Consumer是如何分配拉取消息的。假设有两个master节点的集群,分别是A,B一个消费者组,组中有2个consumer。

【RocketMQ】原理分析:Consumer消费模型、消费规则

因为master集群数:group中消费者数 = 1:1,所以两个master节点恰好可以平均分发到两个消费者上(比如consumer1消费brokerA的消息,consumer2消费brokerB的消息)

另外,如果此时只有一个Consumer,那么它会消费两个master节点的数据;如果此时有3个Consumer,那么会有一个Consumer无法消费到消息

2.2 topic角度

接下来,站在topic角度看看Consumer是如何分配拉取消息的。假设有两个master节点的集群,创建了一个TestTopic,并且对这个topic创建了两个队列,也就是分区。

【RocketMQ】原理分析:Consumer消费模型、消费规则
一个消费者可以同时消费多个master节点上的消息,而Topic中的队列数 :group中的消费者数 = 1:1,所以无论是在哪个group都是一个consumer消费Broker集群中一个队列(比如consumer1消费MQ0,consumer2消费MQ1)

注:消费者定义了两个分组,分组的概念也是和kafka一样,通过分组可以实现消息的广播。每个group中有2个Consumer

另外,如果此时只有一个Consumer,那么它会消费所有分区;如果此时有3个分区,那么就有一个Consumer消费2个分区。

2.3 动态平衡

数量关系确定了,那么具体的对应关系是什么呢?比如consumer1到底是消费MQ0还是MQ2。

和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。消费端会通过Rebalance Service线程,10秒钟做一次基于topic下的所有队列负载消费端遍历自己的所有topic,

  1. 依次调rebalanceByTopic 根据topic获取此topic下的所有queue
  2. 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)
  3. 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法(RocketMQ提供了6中分区的分配算法)
    • AllocateMessageQueueAveragely:平均分配算法(默认)
    • AllocateMessageQueueAveragelyByCircle:环状分配消息队列
    • AllocateMessageQueueByConfig:按照配置来分配队列: 根据用户指定的配置来进行负载
    • AllocateMessageQueueByMachineRoom:按照指定机房来配置队列
    • AllocateMachineRoomNearby:按照就近机房来配置队列:
    • AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行

什么时候触发负载均衡?消费者启动之后消费者数量发生变更 每10秒会触发检查一次rebalance

3.消息的顺序消费

通过分区规则可以实现同类消息在rocketmq上的顺序存储。但是对于消费端来说,如何保证消费的顺序?

前提:生产者 - message queue - 消费者之间是一对一对一的关系。

  1. 首先,需要保证顺序的消息要发送到同一个message queue中;
  2. 其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;
  3. 最后,一个消费者内部对一个mq的消费要保证是有序的。

在RocketMQ的api中对每个Consumer提供了两种消费方式:

  • MessageListenerConcurrently :并发监听,也就是基于多个线程并行来消费消息。这个无法保证消息消费的顺序
  • MessageListenerOrderly:顺序消费,单线程去消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
consumer.subscribe("store_topic_test","*");
consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {  
    list.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));   
    return ConsumeOrderlyStatus.SUCCESS;
});

顺序消费会带来一些问题,

  1. 遇到消息失败的消息,无法跳过,当前队列消费暂停
  2. 降低了消息处理的性能
相关标签: 消息队列