rocketMq-Producer原理源码分析
producer的源码结构如下:
我们通常使用mq发送消息,实例化producer的方式就是:
DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
producer.setNamesrvAddr(namesrv);
producer.start();
producer.send(msg);
所以就从DefaultMQProducer 开始说起吧:
一、DefaultMQProducer继承ClientConfig类同时实现了MQProducer接口,同时包含一个重要的属性
public class DefaultMQProducer extends ClientConfig implements MQProducer {
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
DefaultMQProducer作为rocketmq生产者的默认实现,其实它并没有做任何实现,其内部引用一个DefaultMQProducerImpl实例进行具体消息发送。
它有一些基础配置,比如多长时间内消息发送多少次还是没成功则放弃(默认为4秒内发送3次,每次发消息默认超时间为3秒)
- 重要字段
1 String producerGroup 生产者的组名。
一个jvm内,具有相同producerGroup名字的生产者实例只有一个。
2 retryAnotherBrokerWhenNotStoreOK
消息没有存储成功是否发送到另外一个broker.
3 sendMsgTimeout
发送消息超时时间,默认为3秒 - 重要方法
send(Message msg)
发送消息,调用DefaultMQProducerImpl .send()发送
ClientConfig类就是客户端的公共配置参数说明:
amesrvAddr //namesrv地址列表,多个地址列表用分号隔开
clientIP //本机ip
instanceName //客户端实例名称,客户端创建的多个producer,consumer实际上共用的一个内部实例(包含网络数据和线程资源)
clientCallbackExecutorThreads //通信层异步回调线程数
pollNameServerInterval //轮询namesrv时间间隔 默认30秒
heartbeatBrokerInterval //向broker发送心跳时间间隔 默认30秒
persistConsumerOffsetInterval //持久化consumer消费进度的时间间隔
包括我们的producer.setNamesrvAddr(namesrv);其实就是调用了ClientConfig.setNamesrvAddr(namesrv);
当然,里面还包括了一些不常用,但也很重要的方法,比如
ClientConfig.resetClientConfig(final ClientConfig cc) //自定义一个客户端配置
ClientConfig.cloneClientConfig() //克隆一个已有的客户端配置
至于MQProducer接口就不用说了,定义了一组接口,例如
start(); //启动
producersend (final Message msg, final SendCallback sendCallback, final long timeout) ; //发送消息
.........
DefaultMQProducerImpl:
非常重要的一个方法,start(),也就是producer的启动方法,该方法的大致流程如下:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST: 1、
this.serviceState = ServiceState.START_FAILED; 2、
this.checkConfig(); 3、
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID(); 4、
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); 5、
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); 6、
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); 7、
if (startFactory) {
mQClientFactory.start(); 8、
}
this.serviceState = ServiceState.RUNNING;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 9、
1、检查DefaultMQProducerImpl.ServiceState的状态(初始化状态为ServiceState.CREATE_JUST);只有状态为CREATE_JUST时才启动该Producer;其他状态均不执行启动过程;
2、将DefaultMQProducerImpl.ServiceState置为start_failed,以免客户端同一个进程中重复启动;
3、检查producerGroup是否合法:不能为空、不能有非法字符、长度不能大于255、不能等于"DEFAULT_PRODUCER";若不合法则直接向应用层抛出MQClientException异常;若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);
4、this.defaultMQProducer.changeInstanceNameToPID(); 若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);
5、创建一个客户端实例
String clientId = clientConfig.buildMQClientId(); //构建该Producer的ClientID,等于IP地址@instanceName;
instance = new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
创建MQClientInstance对象。先检查单例对象MQClientManager的factoryTable:ConcurrentHashMap<String/* clientId */, MQClientInstance>变量中是否存在该ClientID的对象,若存在则直接返回该MQClientInstance对象,若不存在,则创建MQClientInstance对象,并以该ClientID为key值将新创建的MQClientInstance对象存入并返回,将返回的MQClientInstance对象赋值给DefaultMQProducerImpl.mQClientFactory变量;说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象;在初始化MQClientInstance对象的过程中,会做如下操作:
this.clientRemotingProcessor = new ClientRemotingProcessor(this); 5.1、
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); 5.2、
if (this.clientConfig.getNamesrvAddr() != null) { 5.3、
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.pullMessageService = new PullMessageService(this); 5.4、
this.rebalanceService = new RebalanceService(this);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); 5.5、
5.1)初始化ClientRemotingProcessor对象,处理接受的事件请求;
5.2)初始化MQClientAPIImpl对象,在初始化过程中,初始化了MQClientAPIImpl.remotingClient:NettyRemotingClient对象,将ClientRemotingProcessor对象作为事件处理器注册到NettyRemotingClient对象中,处理的事件号有:CHECK_TRANSACTION_STATE、NOTIFY_CONSUMER_IDS_CHANGED、RESET_CONSUMER_CLIENT_OFFSET、GET_CONSUMER_STATUS_FROM_CLIENT、GET_CONSUMER_RUNNING_INFO、CONSUME_MESSAGE_DIRECTLY。
5.3)若ClientConfig.namesrvAddr不为空,则拆分成数组存入MQClientAPIImpl.remotingClient变量中;
5.4)初始化PullMessageService、RebalanceService、ConsumerStatsManager服务线程;PullMessageService服务线程是供DefaultMQPushConsumer端使用的,RebalanceService服务线程是供Consumser端使用的;
5.5)初始化producerGroup等于"CLIENT_INNER_PRODUCER"的DefaultMQProducer对象;
6、将DefaultMQProducerImpl对象在MQClientInstance中注册,以producerGroup为key值、DefaultMQProducerImpl对象为values值存入MQClientInstance.producerTable:ConcurrentHashMap<String/* group */, MQProducerInner>变量中,若在该变量中已存在该producerGroup的记录则向应用层抛出MQClientException异常;说明在一个客户端的一个进程下面启动多个Producer时producerGroup名字不能一样,否则无法启动;
7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中;(正常情况下使用命令在集群上建立topic变回保存在这里。)
8、mQClientFactory.start();用MQClientInstance.start方法启动MQClientInstance对象;
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr(); 8.1、
}
this.mQClientAPIImpl.start(); 8.2、
this.startScheduledTask(); 8.3、
this.serviceState = ServiceState.RUNNING 8.4、
this.pullMessageService.start(); 8.5、
this.rebalanceService.start(); 8.6、
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
8.1)关于namesrvAddr的设置一般在两个地方,还有一个是在环境变量里设置.如果这两个地方都没有设置会走该方法.一个是在创建producer时候设置producer.setNamesrvAddr(MyUtils.getNamesrvAddr())
另一个是在环境变量里设置而fetchNameServerAddr()
是第三种调用http接口去寻址.需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址.
8.2)客户端netty启动。
8.3)this.startScheduledTask();启动各种任务调度
8.3.1)从NameSrv遍历TopicRouteInfo(Topic的路由信息有brokerName,queueId组成),然后更新producer和consumer的topic信息 【30秒一次】
8.3.2)清理离线的broker 【30秒一次】
8.3.3)向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息 【30秒一次】
8.3.4)持久化consumer消费进度 【5秒一次】
8.3.5)启动线程池线程数调整线程。 【每分钟调整一次】
8.3.6)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
8.3.8)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动;
8.4)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动;
8.5)启动拉消息服务PullMessageService。
8.6)启动消费端负载均衡服务RebalanceService
9、调用MQClientInstance.sendHeartbeatToAllBrokerWithLock()方法,向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息(consumer中有详细描述)。
此,简单总结一下producer的启动过程,只对状态为CREATE_JUST的DefaultMQProducerImpl执行启动过程,首先检查producerGroup合法性,构建producer的instanceName为进程号,构建producer的clientId为aaa@qq.com, 创建MQClientInstance对象,期间所做的事为5.1-5.5,不再赘述。向MQClientInstance中注册DefaultMQProducerImpl,给topicPublishInfoTable添加一个初始值。用MQClientInstance.start方法启动MQClientInstance对象,期间所做的事为8.1-8.6,向Broker发送心跳消息。大概如下流程图:
producer启动以后自然就是向broker发送消息了,下面看看producer向broker发送消息的具体流程
rocketMq支持producer以三种方式发送消息到broker:
可靠同步发送:
SendResult sendResult = producer.send(msg);
可靠异步发送:producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) { .......}
@Override
public void onException(Throwable e) {..........}
});
单向传输:
producer.sendOneway(msg);
无论哪一种发送方式,只要我们没有明确指明超时时间,那就会使用默认的超时时间(30秒)private int sendMsgTimeout = 3000;
下面我们以可靠同步发送为例:
producer.send(msg);实际调用的是DefaultMQProducerImpl.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
在该方法中会执行以下操作(省略了非关键步骤)
this.makeSureStateOK(); 1、
Validators.checkMessage(msg, this.defaultMQProducer); 2、
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 3、
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 4、
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); 5、
if (mqSelected != null) {
mq = mqSelected;
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); 6、
}
}
1、检查producer是否成功启动,this.serviceState = ServiceState.RUNNING代表producer已经成功启动,否则抛出异常throw new MQClientException("The producer service state not OK,.....")(我们见过这个异常)
2、参数检查,Message的长度大小,topic的非法字符,判空,长度等进行判断。
3、根据传入的topic从DefaultMQProducerImpl的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable获取指定的TopicPublishInfo。
2.1)若没有获取到对应的TopicPublishInfo,就是我们实际情况中没有在集群上建立topic就往这个topic上发消息,那么此时不会报错,会建立该topic对应的TopicPublishInfo,然后返回,即topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());但是这会导致两个问题1、 http://www.mamicode.com/info-detail-327693.html 2、此时的topic是随机建立在一个broker上的,以后该producer的消息都会发送到这个broker上,就做不到负载均衡了。因此建议配置autoCreateTopicEnable=false,这样如果发送时的topic没有建立就会报错。
2.2)若有对应的TopicPublishInfo,则返回该topic对应的TopicPublishInfo。
4、根据发送方式设置失败重发次数,sync=3,async=1,sendOneWay=1。
5、选择一个发送队列。int pos = Math.abs(index++) % this.messageQueueList.size();每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询,如果入参lastBrokerName不为空,代表上次选择的queue发送失败,这次选择应该避开同一个queue【范围是仅该条消息】。
6、进行真正的消息发送!
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic()); 6.1、
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); 6.2、
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
if (hasCheckForbiddenHook()) { 6.3、
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); 6.4、
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 6.5、
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( 6.6、
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
return sendResult;
finally {
msg.setBody(prevBody);
}
}
6.1)String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); 获取broker地址,其中比较细节的是如果没获取到地址,会调用tryToFindTopicPublishInfo(mq.getTopic());从namesrv更新topic路由后再次获取。
6.2)此时获取的地址形如10.3.253.227:10911,rocketMq默认开启VipChannel所以发送的时候是发送到10.3.253.227:10909.新的addr是调用brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);得到的。这里能解释一个问题,当时测试的时候往pim的mq发消息,就是报错,跟10911相关的,默认开启VipChannel,所以发送到了10909上,所以需要手动加上一行代码producer.setVipChannelEnabled(false);
6.3)依次检查producer客户端是否设置了ForbiddenContext和SendMessageHook,如果有则执行对应方法(个人感觉这个类似于切面):this.executeCheckForbiddenHook(checkForbiddenContext); this.executeSendMessageHookBefore(context);
6.4)之后根据之前得到的一系列发送消息的配置,来构造发送给Broker的请求头数据。
6.5)如果请求头的topic是以%RETRY%开头的,就再给请求头额外配置重新消费次数和最大重新消费次数
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
6.6)
request.setBody(msg.getBody()); 6.6.1、
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); 6.6.2、
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { 6.6.3
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); 6.6.5
}
});
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); 6.6.4、
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { //netty相关
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
});
6.6.1)构造request的code和body(实际发送的消息内容)【Byte类型】
6.6.2)调用SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) 方法
6.6.3)进而调用RemotingClient.invokeSync(addr, request, timeoutMillis);
6.6.4)进而调用NettyRemotingClient.invokeSyncImpl(channel, request, timeoutMillis)给broker发送消息,实际是netty间的通信channel.writeAndFlush(request),并且在超时时间内等待返回结果
6.6.5)根据服务端的响应结果,调用processSendResponse(brokerName, msg, response);组装SendResult。
最后将SendResult层层返回,就回到了最初的样子 SendResult sendResult = producer.send(msg);
其中有一个问题还没理解,使用Netty传输POJO对象,重点在于对象的序列化。序列化的对象通过TCP进行网络传输,结合Netty提供的对象编解码器,可以做到远程传输对象。首先Java需要序列化的对象,需要实现java.io.Serializable接口。可以看到Message对象是实现该接口了的。但是发送消息时实际发送的对象是RemotingCommand,但是他并没有序列化。这是为什么呢?
最后简单总结一下producer发送消息的过程(以sync为例):检查producer状态,检查参数,获取topic的路由信息,选择发送队列,获取broker的地址,配置request请求头,给request添加code值和body的内容,使用netty将从NettyRemotingClient【netty服客户端】消息传输到broker(NettyRemotingServer【netty服务端】),等待响应结果,组装SendResult 并返回。大致如下流程:
上一篇: Python学习笔记——Django视图
下一篇: Python Django项目实例二