rocketmq消费负载均衡--push消费详解
前言
本文介绍了defaultmqpushconsumerimpl消费者,客户端负载均衡相关知识点。本文从defaultmqpushconsumerimpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalancebytopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。
介绍之前首先抛出几个问题:
1. 要做负载均衡,首先要解决的一个问题是什么?
2. 负载均衡是client端处理还是broker端处理?
个人理解:
1. 要做负载均衡,首先要做的就是信号收集。
所谓信号收集,就是得知道每一个consumergroup有哪些consumer,对应的topic是谁。信号收集分为client端信号收集与broker端信号收集两个部分。
2. 负载均衡放在client端处理。
具体做法是:消费者客户端在启动时完善rebalanceimpl实例,同时拷贝订阅信息存放rebalanceimpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有broker,注册registerconsumer,等待上述过程准备好之后在client端不断执行的负载均衡服务线程从broker端获取一份全局信息(该consumergroup下所有的消费client),然后分配这些全局信息,获取当前客户端分配到的消费队列。
本文具体的内容:
i. copysubscription
client端信号收集,拷贝订阅信息。
在defaultmqpushconsumerimpl.start()时,会将消费者的topic订阅关系设置到rebalanceimpl的subscriptioninner的map中用于负载:
private void copysubscription() throws mqclientexception { try { //注:一个consumer对象可以订阅多个topic map<string, string> sub = this.defaultmqpushconsumer.getsubscription(); if (sub != null) { for (final map.entry<string, string> entry : sub.entryset()) { final string topic = entry.getkey(); final string substring = entry.getvalue(); subscriptiondata subscriptiondata = filterapi.buildsubscriptiondata(this.defaultmqpushconsumer.getconsumergroup(),// topic, substring); this.rebalanceimpl.getsubscriptioninner().put(topic, subscriptiondata); } } if (null == this.messagelistenerinner) { this.messagelistenerinner = this.defaultmqpushconsumer.getmessagelistener(); } switch (this.defaultmqpushconsumer.getmessagemodel()) { case broadcasting: break; case clustering: final string retrytopic = mixall.getretrytopic(this.defaultmqpushconsumer.getconsumergroup()); subscriptiondata subscriptiondata = filterapi.buildsubscriptiondata(this.defaultmqpushconsumer.getconsumergroup(),// retrytopic, subscriptiondata.sub_all); this.rebalanceimpl.getsubscriptioninner().put(retrytopic, subscriptiondata); break; default: break; } } catch (exception e) { throw new mqclientexception("subscription exception", e); } }
filterapi.buildsubscriptiondata接口将订阅关系转换为subscriptiondata 数据,其中substring包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。
ii. 完善rebalanceimpl实例
client继续收集信息:
this.rebalanceimpl.setconsumergroup(this.defaultmqpushconsumer.getconsumergroup()); this.rebalanceimpl.setmessagemodel(this.defaultmqpushconsumer.getmessagemodel()); this.rebalanceimpl.setallocatemessagequeuestrategy(this.defaultmqpushconsumer .getallocatemessagequeuestrategy()); this.rebalanceimpl.setmqclientfactory(this.mqclientfactory);
本文以defaultmqpushconsumerimpl为例,因此this对象类型为defaultmqpushconsumerimp。
iii. this.rebalanceservice.start()
开启负载均衡服务。this.rebalanceservice是一个rebalanceservice实例对象,它继承与servicethread,是一个线程类。 this.rebalanceservice.start()执行时,也即执行rebalanceservice线程体:
@override public void run() { log.info(this.getservicename() + " service started"); while (!this.isstoped()) { this.waitforrunning(waitinterval); this.mqclientfactory.dorebalance(); } log.info(this.getservicename() + " service end"); }
iv. this.mqclientfactory.dorebalance
客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:
public void dorebalance() { for (string group : this.consumertable.keyset()) { mqconsumerinner impl = this.consumertable.get(group); if (impl != null) { try { impl.dorebalance(); } catch (exception e) { log.error("dorebalance exception", e); } } } }
v. mqconsumerinner.dorebalance
由于本文以defaultmqpushconsumerimpl消费过程为例,即defaultmqpushconsumerimpl.dorebalance:
@override public void dorebalance() { if (this.rebalanceimpl != null) { this.rebalanceimpl.dorebalance(); } }
步骤ii 中完善了rebalanceimpl实例,为调用rebalanceimpl.dorebalance()提供了初始数据。
rebalanceimpl.dorebalance()过程如下:
public void dorebalance() { // 前文copysubscription中初始化了subscriptioninner map<string, subscriptiondata> subtable = this.getsubscriptioninner(); if (subtable != null) { for (final map.entry<string, subscriptiondata> entry : subtable.entryset()) { final string topic = entry.getkey(); try { this.rebalancebytopic(topic); } catch (exception e) { if (!topic.startswith(mixall.retry_group_topic_prefix)) { log.warn("rebalancebytopic exception", e); } } } } this.truncatemessagequeuenotmytopic(); }
vi. rebalancebytopic -- 核心步骤之一
rebalancebytopic方法中根据消费者的消费类型为broadcasting或clustering做不同的逻辑处理。clustering逻辑包括broadcasting逻辑,本部分只介绍集群消费负载均衡的逻辑。
集群消费负载均衡逻辑主要代码如下(省略了log等代码):
//1.从topicsubscribeinfotable列表中获取与该topic相关的所有消息队列 set<messagequeue> mqset = this.topicsubscribeinfotable.get(topic); //2. 从broker端获取消费该消费组的所有客户端clientid list<string> cidall = this.mqclientfactory.findconsumeridlist(topic, consumergroup); f (null == mqset) { ... } if (null == cidall) { ... } if (mqset != null && cidall != null) { list<messagequeue> mqall = new arraylist<messagequeue>(); mqall.addall(mqset); collections.sort(mqall); collections.sort(cidall); // 3.创建defaultmqpushconsumer对象时默认设置为allocatemessagequeueaveragely allocatemessagequeuestrategy strategy = this.allocatemessagequeuestrategy; list<messagequeue> allocateresult = null; try { // 4.调用allocatemessagequeueaveragely.allocate方法,获取当前client分配消费队列 allocateresult = strategy.allocate( this.consumergroup, this.mqclientfactory.getclientid(), mqall, cidall); } catch (throwable e) { return; } // 5. 将分配得到的allocateresult 中的队列放入allocateresultset 集合 set<messagequeue> allocateresultset = new hashset<messagequeue>(); if (allocateresult != null) { allocateresultset.addall(allocateresult); } 、 //6. 更新updateprocessqueue boolean changed = this.updateprocessqueuetableinrebalance(topic, allocateresultset); if (changed) { this.messagequeuechanged(topic, mqset, allocateresultset); } }
注:broadcasting逻辑只包含上述的1、6。
集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:
第1点:从topicsubscribeinfotable列表中获取与该topic相关的所有消息队列
第2点: 从broker端获取消费该消费组的所有客户端clientid
首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端channelinfotable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:
第4点:调用allocatemessagequeueaveragely.allocate方法,获取当前client分配消费队列
注:上图中cid1、cid2、...、cidn通过 getconsumeridlistbygroup 获取,它们在这个consumergroup下所有在线客户端列表中。
当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 基于Ok+Rxjava实现断点续传下载
推荐阅读