欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rocketmq消费负载均衡--push消费详解

程序员文章站 2022-03-18 17:09:32
前言 本文介绍了defaultmqpushconsumerimpl消费者,客户端负载均衡相关知识点。本文从defaultmqpushconsumerimpl启动过程到实现...

前言

本文介绍了defaultmqpushconsumerimpl消费者,客户端负载均衡相关知识点。本文从defaultmqpushconsumerimpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalancebytopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。

介绍之前首先抛出几个问题:

1. 要做负载均衡,首先要解决的一个问题是什么?

2. 负载均衡是client端处理还是broker端处理?

个人理解:

1. 要做负载均衡,首先要做的就是信号收集。

所谓信号收集,就是得知道每一个consumergroup有哪些consumer,对应的topic是谁。信号收集分为client端信号收集与broker端信号收集两个部分。

2. 负载均衡放在client端处理。

具体做法是:消费者客户端在启动时完善rebalanceimpl实例,同时拷贝订阅信息存放rebalanceimpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有broker,注册registerconsumer,等待上述过程准备好之后在client端不断执行的负载均衡服务线程从broker端获取一份全局信息(该consumergroup下所有的消费client),然后分配这些全局信息,获取当前客户端分配到的消费队列。

本文具体的内容:

i. copysubscription

client端信号收集,拷贝订阅信息。

在defaultmqpushconsumerimpl.start()时,会将消费者的topic订阅关系设置到rebalanceimpl的subscriptioninner的map中用于负载:

private void copysubscription() throws mqclientexception {
try {
//注:一个consumer对象可以订阅多个topic
map<string, string> sub = this.defaultmqpushconsumer.getsubscription();
if (sub != null) {
for (final map.entry<string, string> entry : sub.entryset()) {
final string topic = entry.getkey();
final string substring = entry.getvalue();
subscriptiondata subscriptiondata =
filterapi.buildsubscriptiondata(this.defaultmqpushconsumer.getconsumergroup(),//
topic, substring);
this.rebalanceimpl.getsubscriptioninner().put(topic, subscriptiondata);
}
}
if (null == this.messagelistenerinner) {
this.messagelistenerinner = this.defaultmqpushconsumer.getmessagelistener();
}
switch (this.defaultmqpushconsumer.getmessagemodel()) {
case broadcasting:
break;
case clustering:
final string retrytopic = mixall.getretrytopic(this.defaultmqpushconsumer.getconsumergroup());
subscriptiondata subscriptiondata =
filterapi.buildsubscriptiondata(this.defaultmqpushconsumer.getconsumergroup(),//
retrytopic, subscriptiondata.sub_all);
this.rebalanceimpl.getsubscriptioninner().put(retrytopic, subscriptiondata);
break;
default:
break;
}
}
catch (exception e) {
throw new mqclientexception("subscription exception", e);
}
}

filterapi.buildsubscriptiondata接口将订阅关系转换为subscriptiondata 数据,其中substring包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。

ii. 完善rebalanceimpl实例

client继续收集信息:

this.rebalanceimpl.setconsumergroup(this.defaultmqpushconsumer.getconsumergroup());
this.rebalanceimpl.setmessagemodel(this.defaultmqpushconsumer.getmessagemodel());
this.rebalanceimpl.setallocatemessagequeuestrategy(this.defaultmqpushconsumer
.getallocatemessagequeuestrategy());
this.rebalanceimpl.setmqclientfactory(this.mqclientfactory);

本文以defaultmqpushconsumerimpl为例,因此this对象类型为defaultmqpushconsumerimp。

iii. this.rebalanceservice.start()

开启负载均衡服务。this.rebalanceservice是一个rebalanceservice实例对象,它继承与servicethread,是一个线程类。 this.rebalanceservice.start()执行时,也即执行rebalanceservice线程体:

@override
public void run() {
log.info(this.getservicename() + " service started");
while (!this.isstoped()) {
this.waitforrunning(waitinterval);
this.mqclientfactory.dorebalance();
}
log.info(this.getservicename() + " service end");
}

iv. this.mqclientfactory.dorebalance

客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:

public void dorebalance() {
for (string group : this.consumertable.keyset()) {
mqconsumerinner impl = this.consumertable.get(group);
if (impl != null) {
try {
impl.dorebalance();
} catch (exception e) {
log.error("dorebalance exception", e);
}
}
}
}

v. mqconsumerinner.dorebalance

由于本文以defaultmqpushconsumerimpl消费过程为例,即defaultmqpushconsumerimpl.dorebalance:

@override
public void dorebalance() {
if (this.rebalanceimpl != null) {
this.rebalanceimpl.dorebalance();
}
}

步骤ii 中完善了rebalanceimpl实例,为调用rebalanceimpl.dorebalance()提供了初始数据。

rebalanceimpl.dorebalance()过程如下:

public void dorebalance() {
     // 前文copysubscription中初始化了subscriptioninner
map<string, subscriptiondata> subtable = this.getsubscriptioninner();
if (subtable != null) {
for (final map.entry<string, subscriptiondata> entry : subtable.entryset()) {
final string topic = entry.getkey();
try {
this.rebalancebytopic(topic);
} catch (exception e) {
if (!topic.startswith(mixall.retry_group_topic_prefix)) {
log.warn("rebalancebytopic exception", e);
}
}
}
}
this.truncatemessagequeuenotmytopic();
}

vi. rebalancebytopic -- 核心步骤之一

rebalancebytopic方法中根据消费者的消费类型为broadcasting或clustering做不同的逻辑处理。clustering逻辑包括broadcasting逻辑,本部分只介绍集群消费负载均衡的逻辑。

集群消费负载均衡逻辑主要代码如下(省略了log等代码):

//1.从topicsubscribeinfotable列表中获取与该topic相关的所有消息队列
set<messagequeue> mqset = this.topicsubscribeinfotable.get(topic);
//2. 从broker端获取消费该消费组的所有客户端clientid
list<string> cidall = this.mqclientfactory.findconsumeridlist(topic, consumergroup);
f (null == mqset) { ... }
if (null == cidall) { ... }
if (mqset != null && cidall != null) {
list<messagequeue> mqall = new arraylist<messagequeue>();
mqall.addall(mqset);
collections.sort(mqall);
collections.sort(cidall);

     // 3.创建defaultmqpushconsumer对象时默认设置为allocatemessagequeueaveragely
allocatemessagequeuestrategy strategy = this.allocatemessagequeuestrategy;

list<messagequeue> allocateresult = null;
try {
         // 4.调用allocatemessagequeueaveragely.allocate方法,获取当前client分配消费队列
allocateresult = strategy.allocate(
this.consumergroup, 
this.mqclientfactory.getclientid(), 
mqall,
cidall);
} catch (throwable e) {
return;
}
    // 5. 将分配得到的allocateresult 中的队列放入allocateresultset 集合
set<messagequeue> allocateresultset = new hashset<messagequeue>();
if (allocateresult != null) {
allocateresultset.addall(allocateresult);
}
、
     //6. 更新updateprocessqueue
boolean changed = this.updateprocessqueuetableinrebalance(topic, allocateresultset);
if (changed) {
this.messagequeuechanged(topic, mqset, allocateresultset);
}
}

注:broadcasting逻辑只包含上述的1、6。

集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:

第1点:从topicsubscribeinfotable列表中获取与该topic相关的所有消息队列

rocketmq消费负载均衡--push消费详解

第2点: 从broker端获取消费该消费组的所有客户端clientid

首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端channelinfotable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:

rocketmq消费负载均衡--push消费详解

第4点:调用allocatemessagequeueaveragely.allocate方法,获取当前client分配消费队列

rocketmq消费负载均衡--push消费详解

注:上图中cid1、cid2、...、cidn通过 getconsumeridlistbygroup 获取,它们在这个consumergroup下所有在线客户端列表中。

当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。