欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java rocketmq--消息的产生(普通消息)

程序员文章站 2022-07-04 17:04:04
前言 与消息发送紧密相关的几行代码: 1. defaultmqproducer producer = new defaultmqproducer("producer...

前言

与消息发送紧密相关的几行代码:

1. defaultmqproducer producer = new defaultmqproducer("producergroupname");

2. producer.start();

3. message msg = new message(...)

4. sendresult sendresult = producer.send(msg);

5. producer.shutdown();

那这几行代码执行时,背后都做了什么?

一. 首先是defaultmqproducer.start

@override
public void start() throws mqclientexception {
this.defaultmqproducerimpl.start();
}

调用了默认生成消息的实现类 -- defaultmqproducerimpl

调用defaultmqproducerimpl.start()方法,defaultmqproducerimpl.start()会初始化得到mqclientinstance实例对象,mqclientinstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务pullmessageservice.start()、启动负载平衡服务rebalanceservice.start(),比如网络通信服务mqclientapiimpl.start()

另外,还会执行与生产消息相关的信息,如注册producegroup、new一个topicpublishinfo对象并以默认topickey为键值,构成键值对存入defaultmqproducerimpl的topicpublishinfotable中。

efaultmqproducerimpl.start()后,获取的mqclientinstance实例对象会调用sendheartbeattoallbroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述defaultmqproducerimpl.start()过程:

java rocketmq--消息的产生(普通消息)

上图中的三个部分中涉及的内容:

1.1 初始化mqclientinstance

一个客户端只能产生一个mqclientinstance实例对象,产生方式使用了工厂模式与单例模式。mqclientinstance.start()方法启动一些服务,源码如下:

public void start() throws mqclientexception {
synchronized (this) {
switch (this.servicestate) {
case create_just:
this.servicestate = servicestate.start_failed;
// if not specified,looking address from name server
if (null == this.clientconfig.getnamesrvaddr()) {
this.mqclientapiimpl.fetchnameserveraddr();
}
// start request-response channel
this.mqclientapiimpl.start();
// start various schedule tasks
this.startscheduledtask();
// start pull service
this.pullmessageservice.start();
// start rebalance service
this.rebalanceservice.start();
// start push service
this.defaultmqproducer.getdefaultmqproducerimpl().start(false);
log.info("the client factory [{}] start ok", this.clientid);
this.servicestate = servicestate.running;
break;
case running:
break;
case shutdown_already:
break;
case start_failed:
throw new mqclientexception("the factory object[" + this.getclientid() + "] has been created before, and failed.", null);
default:
break;
}
}
}

1.2 注册producer

该过程会将这个当前producer对象注册到mqclientinstance实例对象的的producertable中。一个jvm(一个客户端)中一个producergroup只能有一个实例,mqclientinstance操作producertable大概有如下几个方法:

  • -- selectproducer
  • -- updatetopicrouteinfofromnameserver
  • -- prepareheartbeatdata
  • -- isneedupdatetopicrouteinfo
  • -- shutdown

注:

根据不同的clientid,mqclientmanager将给出不同的mqclientinstance;

根据不同的group,mqclientinstance将给出不同的mqproducer和mqconsumer

1.3 向路由信息表中添加路由

topicpublishinfotable定义:

public class defaultmqproducerimpl implements mqproducerinner {
private final logger log = clientlogger.getlog();
private final random random = new random();
private final defaultmqproducer defaultmqproducer;
private final concurrentmap<string/* topic */, topicpublishinfo> topicpublishinfotable = new concurrenthashmap<string, topicpublishinfo>();

它是一个以topic为key的map型数据结构,defaultmqproducerimpl.start()时会默认创建一个key=mixall.default_topic的topicpublishinfo存放到topicpublishinfotable中。

1.4 发送心跳包

mqclientinstance向broker发送心跳包时,调用sendheartbeattoallbroker( ),以及从mqclientinstance实例对象的brokeraddrtable中拿到所有broker地址,向这些broker发送心跳包。

sendheartbeattoallbroker会涉及到prepareheartbeatdata()方法,该方法会生成heartbeatdata数据,发送心跳包时,heartbeatdata作为心跳包的body。与producer相关的部分代码如下:

// producer
for (map.entry<string/* group */, mqproducerinner> entry : this.producertable.entryset()) {
mqproducerinner impl = entry.getvalue();
if (impl != null) {
producerdata producerdata = new producerdata();
producerdata.setgroupname(entry.getkey());
heartbeatdata.getproducerdataset().add(producerdata);
}

二、. sendresult sendresult = producer.send(msg)

首先会调用defaultmqproducer.send(msg) ,继而调用senddefaultimpl:

public sendresult send(message msg,
long timeout) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception {
return this.senddefaultimpl(msg, communicationmode.sync, null, timeout);
}

senddefaultimpl做了啥?

2.1. 获取topicpublishinfo

根据msg的topic从topicpublishinfotable获取对应的topicpublishinfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:

首先gettopicrouteinfofromnameserver,然后topicroutedata2topicpublishinfo。

java rocketmq--消息的产生(普通消息)

2.2 选择消息发送的队列

普通消息:默认方式下,selectonemessagequeue从topicpublishinfo中的messagequeuelist中选择一个队列(messagequeue)进行发送消息,默认采用长轮询的方式选择队列 。

它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。

顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现messagequeueselector,根据参数arg字段来选择queue。

private sendresult sendselectimpl(
message msg,
messagequeueselector selector,
object arg,
final communicationmode communicationmode,
final sendcallback sendcallback, final long timeout
) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception { 。。。}

事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。

2.3 封装消息体通信包,发送数据包

首先,根据获取的messagequeue中的getbrokername,调用findbrokeraddressinpublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :

brokeraddrtable.get(brokername).get(mixall.master_id)

可知获取的broker均为master(id=0)

然后, 将与该消息相关信息打包成remotingcommand数据包,其requestcode.send_message

根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。

封装消息请求包的包头:

sendmessagerequestheader requestheader = new sendmessagerequestheader();
requestheader.setproducergroup(this.defaultmqproducer.getproducergroup());
requestheader.settopic(msg.gettopic());
requestheader.setdefaulttopic(this.defaultmqproducer.getcreatetopickey());
requestheader.setdefaulttopicqueuenums(this.defaultmqproducer.getdefaulttopicqueuenums());
requestheader.setqueueid(mq.getqueueid());
requestheader.setsysflag(sysflag);
requestheader.setborntimestamp(system.currenttimemillis());
requestheader.setflag(msg.getflag());
requestheader.setproperties(messagedecoder.messageproperties2string(msg.getproperties()));
requestheader.setreconsumetimes(0);
requestheader.setunitmode(this.isunitmode());
requestheader.setbatch(msg instanceof messagebatch);

发送消息包(普通消息默认为同步方式):

sendresult sendresult = null;
switch (communicationmode) {
   case sync:
  sendresult = this.mqclientfactory.getmqclientapiimpl().sendmessage(
  brokeraddr,
  mq.getbrokername(),
   msg,
  requestheader,
   timeout,
  communicationmode,
  context,
  this);
break;

处理来自broker端的响应数据包:

private sendresult sendmessagesync(
final string addr,
final string brokername,
final message msg,
final long timeoutmillis,
final remotingcommand request
) throws remotingexception, mqbrokerexception, interruptedexception {
remotingcommand response = this.remotingclient.invokesync(addr, request, timeoutmillis);
assert response != null;
return this.processsendresponse(brokername, msg, response);
}

broker端处理request数据包后会将消息存储到commitlog,具体过程后续分析。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。