欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketMq-Producer原理源码分析

程序员文章站 2022-07-14 22:39:55
...

producer的源码结构如下:

rocketMq-Producer原理源码分析

rocketMq-Producer原理源码分析


 我们通常使用mq发送消息,实例化producer的方式就是:

DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
producer.setNamesrvAddr(namesrv);
producer.start();
producer.send(msg);

 所以就从DefaultMQProducer 开始说起吧:

一、DefaultMQProducer继承ClientConfig类同时实现了MQProducer接口,同时包含一个重要的属性

public class DefaultMQProducer extends ClientConfig implements MQProducer {
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

DefaultMQProducer作为rocketmq生产者的默认实现,其实它并没有做任何实现,其内部引用一个DefaultMQProducerImpl实例进行具体消息发送。

它有一些基础配置,比如多长时间内消息发送多少次还是没成功则放弃(默认为4秒内发送3次,每次发消息默认超时间为3秒)

  • 重要字段 
    1 String producerGroup 生产者的组名。 
    一个jvm内,具有相同producerGroup名字的生产者实例只有一个。 
    2 retryAnotherBrokerWhenNotStoreOK 
    消息没有存储成功是否发送到另外一个broker. 
    3 sendMsgTimeout 
    发送消息超时时间,默认为3秒
  • 重要方法 
    send(Message msg) 
    发送消息,调用DefaultMQProducerImpl .send()发送

        ClientConfig类就是客户端的公共配置参数说明:

amesrvAddr   //namesrv地址列表,多个地址列表用分号隔开
clientIP   //本机ip
instanceName  //客户端实例名称,客户端创建的多个producer,consumer实际上共用的一个内部实例(包含网络数据和线程资源)
clientCallbackExecutorThreads   //通信层异步回调线程数
pollNameServerInterval   //轮询namesrv时间间隔  默认30秒
heartbeatBrokerInterval   //向broker发送心跳时间间隔  默认30秒
persistConsumerOffsetInterval   //持久化consumer消费进度的时间间隔

包括我们的producer.setNamesrvAddr(namesrv);其实就是调用了ClientConfig.setNamesrvAddr(namesrv);

    当然,里面还包括了一些不常用,但也很重要的方法,比如


ClientConfig.resetClientConfig(final ClientConfig cc)      //自定义一个客户端配置
ClientConfig.cloneClientConfig()      //克隆一个已有的客户端配置

至于MQProducer接口就不用说了,定义了一组接口,例如

      start();   //启动

      producersend (final Message msg, final SendCallback sendCallback, final long timeout) ;   //发送消息

      .........

DefaultMQProducerImpl:

非常重要的一个方法,start(),也就是producer的启动方法,该方法的大致流程如下:


public void start(final boolean startFactory) throws MQClientException {
  switch (this.serviceState) {
    case CREATE_JUST:                                     1、
        this.serviceState = ServiceState.START_FAILED;    2、
        this.checkConfig();                               3、
        if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
            this.defaultMQProducer.changeInstanceNameToPID();   4、
        }
        this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);   5、
        boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);             6、
        this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());                 7、
        if (startFactory) {
            mQClientFactory.start();               8、
        }
        this.serviceState = ServiceState.RUNNING;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();  9、

1、检查DefaultMQProducerImpl.ServiceState的状态(初始化状态为ServiceState.CREATE_JUST);只有状态为CREATE_JUST时才启动该Producer;其他状态均不执行启动过程;

2、将DefaultMQProducerImpl.ServiceState置为start_failed,以免客户端同一个进程中重复启动;

3、检查producerGroup是否合法:不能为空、不能有非法字符、长度不能大于255、不能等于"DEFAULT_PRODUCER";若不合法则直接向应用层抛出MQClientException异常;若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);

4、this.defaultMQProducer.changeInstanceNameToPID();   若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);

5、创建一个客户端实例

String clientId = clientConfig.buildMQClientId();     //构建该Producer的ClientID,等于IP地址@instanceName;
instance = new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);

创建MQClientInstance对象。先检查单例对象MQClientManager的factoryTable:ConcurrentHashMap<String/* clientId */, MQClientInstance>变量中是否存在该ClientID的对象,若存在则直接返回该MQClientInstance对象,若不存在,则创建MQClientInstance对象,并以该ClientID为key值将新创建的MQClientInstance对象存入并返回,将返回的MQClientInstance对象赋值给DefaultMQProducerImpl.mQClientFactory变量;说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象;在初始化MQClientInstance对象的过程中,会做如下操作:

this.clientRemotingProcessor = new ClientRemotingProcessor(this);   5.1、
 
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);    5.2、
 
if (this.clientConfig.getNamesrvAddr() != null) {                   5.3、
    this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
    log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.pullMessageService = new PullMessageService(this);                  5.4、
this.rebalanceService = new RebalanceService(this);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
 
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);  5.5、

 5.1)初始化ClientRemotingProcessor对象,处理接受的事件请求;

   5.2)初始化MQClientAPIImpl对象,在初始化过程中,初始化了MQClientAPIImpl.remotingClient:NettyRemotingClient对象,将ClientRemotingProcessor对象作为事件处理器注册到NettyRemotingClient对象中,处理的事件号有:CHECK_TRANSACTION_STATE、NOTIFY_CONSUMER_IDS_CHANGED、RESET_CONSUMER_CLIENT_OFFSET、GET_CONSUMER_STATUS_FROM_CLIENT、GET_CONSUMER_RUNNING_INFO、CONSUME_MESSAGE_DIRECTLY。

    5.3)若ClientConfig.namesrvAddr不为空,则拆分成数组存入MQClientAPIImpl.remotingClient变量中;

    5.4)初始化PullMessageService、RebalanceService、ConsumerStatsManager服务线程;PullMessageService服务线程是供DefaultMQPushConsumer端使用的,RebalanceService服务线程是供Consumser端使用的;

    5.5)初始化producerGroup等于"CLIENT_INNER_PRODUCER"的DefaultMQProducer对象;

 6、将DefaultMQProducerImpl对象在MQClientInstance中注册,以producerGroup为key值、DefaultMQProducerImpl对象为values值存入MQClientInstance.producerTable:ConcurrentHashMap<String/* group */, MQProducerInner>变量中,若在该变量中已存在该producerGroup的记录则向应用层抛出MQClientException异常;说明在一个客户端的一个进程下面启动多个Producer时producerGroup名字不能一样,否则无法启动;

7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中;(正常情况下使用命令在集群上建立topic变回保存在这里。)

8、mQClientFactory.start();用MQClientInstance.start方法启动MQClientInstance对象;


public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();    8.1、
                }
                this.mQClientAPIImpl.start();        8.2、
                this.startScheduledTask();           8.3、
                this.serviceState = ServiceState.RUNNING   8.4、
                this.pullMessageService.start();      8.5、
                this.rebalanceService.start();      8.6、
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

 8.1)关于namesrvAddr的设置一般在两个地方,还有一个是在环境变量里设置.如果这两个地方都没有设置会走该方法.一个是在创建producer时候设置producer.setNamesrvAddr(MyUtils.getNamesrvAddr()) 另一个是在环境变量里设置fetchNameServerAddr()是第三种调用http接口去寻址.需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址.

         8.2)客户端netty启动。

         8.3)this.startScheduledTask();启动各种任务调度   

              8.3.1从NameSrv遍历TopicRouteInfo(Topic的路由信息有brokerName,queueId组成),然后更新producer和consumer的topic信息     【30秒一次】

              8.3.2)清理离线的broker        【30秒一次】

              8.3.3)向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息    【30秒一次】

              8.3.4)持久化consumer消费进度    【5秒一次】

              8.3.5)启动线程池线程数调整线程。   【每分钟调整一次】

              8.3.6)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);     

              8.3.8)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动;

         8.4)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动;

         8.5)启动拉消息服务PullMessageService。

         8.6)启动消费端负载均衡服务RebalanceService

9、调用MQClientInstance.sendHeartbeatToAllBrokerWithLock()方法,向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息(consumer中有详细描述)。

此,简单总结一下producer的启动过程,只对状态为CREATE_JUST的DefaultMQProducerImpl执行启动过程,首先检查producerGroup合法性,构建producer的instanceName为进程号,构建producer的clientId为aaa@qq.com,  创建MQClientInstance对象,期间所做的事为5.1-5.5,不再赘述。向MQClientInstance中注册DefaultMQProducerImpl,给topicPublishInfoTable添加一个初始值。用MQClientInstance.start方法启动MQClientInstance对象,期间所做的事为8.1-8.6,向Broker发送心跳消息。大概如下流程图:

rocketMq-Producer原理源码分析



producer启动以后自然就是向broker发送消息了,下面看看producer向broker发送消息的具体流程

  rocketMq支持producer以三种方式发送消息到broker:

  可靠同步发送:

SendResult sendResult = producer.send(msg);
可靠异步发送:
producer.send(msg, new SendCallback() {
                  @Override
                  public void onSuccess(SendResult sendResult) { .......}
                  @Override
                  public void onException(Throwable e) {..........}
});

单向传输:

producer.sendOneway(msg);

无论哪一种发送方式,只要我们没有明确指明超时时间,那就会使用默认的超时时间(30秒)private int sendMsgTimeout 3000;

下面我们以可靠同步发送为例:

producer.send(msg);实际调用的是DefaultMQProducerImpl.sendDefaultImpl(msg, CommunicationMode.SYNCnull, timeout);

   在该方法中会执行以下操作(省略了非关键步骤)

this.makeSureStateOK();      1、
 
Validators.checkMessage(msg, this.defaultMQProducer);    2、
 
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    3、
 
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;    4、
 
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);       5、
        if (mqSelected != null) {
            mq = mqSelected;
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);   6、
         }
}

1、检查producer是否成功启动,this.serviceState = ServiceState.RUNNING代表producer已经成功启动,否则抛出异常throw new MQClientException("The producer service state not OK,.....")(我们见过这个异常) 

   2、参数检查,Message的长度大小,topic的非法字符,判空,长度等进行判断。

   3、根据传入的topic从DefaultMQProducerImpl的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable获取指定的TopicPublishInfo。

       2.1)若没有获取到对应的TopicPublishInfo,就是我们实际情况中没有在集群上建立topic就往这个topic上发消息,那么此时不会报错,会建立该topic对应的TopicPublishInfo,然后返回,即topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());但是这会导致两个问题1、 http://www.mamicode.com/info-detail-327693.html   2、此时的topic是随机建立在一个broker上的,以后该producer的消息都会发送到这个broker上,就做不到负载均衡了。因此建议配置autoCreateTopicEnable=false,这样如果发送时的topic没有建立就会报错。

        2.2)若有对应的TopicPublishInfo,则返回该topic对应的TopicPublishInfo。

   4、根据发送方式设置失败重发次数,sync=3,async=1,sendOneWay=1。

   5、选择一个发送队列。int pos = Math.abs(index++) % this.messageQueueList.size()每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询,如果入参lastBrokerName不为空,代表上次选择的queue发送失败,这次选择应该避开同一个queue【范围是仅该条消息】。

   6、进行真正的消息发送!

if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());               6.1、
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
 
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);    6.2、
 
    byte[] prevBody = msg.getBody();
    try {
        //for MessageBatch,ID has been set in the generating process
        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
 
        int sysFlag = 0;
        if (this.tryToCompressMessage(msg)) {
            sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        }
 
        if (hasCheckForbiddenHook()) {                                                          6.3、
            CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
            checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
            checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
            checkForbiddenContext.setCommunicationMode(communicationMode);
            checkForbiddenContext.setBrokerAddr(brokerAddr);
            checkForbiddenContext.setMessage(msg);
            checkForbiddenContext.setMq(mq);
            checkForbiddenContext.setUnitMode(this.isUnitMode());
            this.executeCheckForbiddenHook(checkForbiddenContext);
        }
 
        if (this.hasSendMessageHook()) {
            context = new SendMessageContext();
            context.setProducer(this);
            context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            context.setCommunicationMode(communicationMode);
            context.setBornHost(this.defaultMQProducer.getClientIP());
            context.setBrokerAddr(brokerAddr);
            context.setMessage(msg);
            context.setMq(mq);
            String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (isTrans != null && isTrans.equals("true")) {
                context.setMsgType(MessageType.Trans_Msg_Half);
            }
 
            if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                context.setMsgType(MessageType.Delay_Msg);
            }
            this.executeSendMessageHookBefore(context);
        }
 
        SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                  6.4、
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTopic(msg.getTopic());
        requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
        requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setSysFlag(sysFlag);
        requestHeader.setBornTimestamp(System.currentTimeMillis());
        requestHeader.setFlag(msg.getFlag());
        requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
        requestHeader.setReconsumeTimes(0);
        requestHeader.setUnitMode(this.isUnitMode());
        requestHeader.setBatch(msg instanceof MessageBatch);
        if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                      6.5、
            String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
            if (reconsumeTimes != null) {
                requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
            }
 
            String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
            if (maxReconsumeTimes != null) {
                requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
            }
        }
 
        SendResult sendResult = null;
                sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                6.6、
                    brokerAddr,
                    mq.getBrokerName(),
                    msg,
                    requestHeader,
                    timeout,
                    communicationMode,
                    sendCallback,
                    topicPublishInfo,
                    this.mQClientFactory,
                    this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                    context,
                    this);
        return sendResult;
  finally {
        msg.setBody(prevBody);
    }
}

6.1)String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());     获取broker地址,其中比较细节的是如果没获取到地址,会调用tryToFindTopicPublishInfo(mq.getTopic());从namesrv更新topic路由后再次获取。

       6.2)此时获取的地址形如10.3.253.227:10911,rocketMq默认开启VipChannel所以发送的时候是发送到10.3.253.227:10909.新的addr是调用brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);得到的。这里能解释一个问题,当时测试的时候往pim的mq发消息,就是报错,跟10911相关的,默认开启VipChannel,所以发送到了10909上,所以需要手动加上一行代码producer.setVipChannelEnabled(false);

   6.3)依次检查producer客户端是否设置了ForbiddenContext和SendMessageHook,如果有则执行对应方法(个人感觉这个类似于切面):this.executeCheckForbiddenHook(checkForbiddenContext);      this.executeSendMessageHookBefore(context);

       6.4)之后根据之前得到的一系列发送消息的配置,来构造发送给Broker的请求头数据。

       6.5)如果请求头的topic是以%RETRY%开头的,就再给请求头额外配置重新消费次数和最大重新消费次数

                      String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);

                      requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));

                      String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);

                      requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));

        6.6)

request.setBody(msg.getBody());              6.6.1、
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);    6.6.2、
 
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {                 6.6.3
    @Override
    public void operationComplete(ResponseFuture responseFuture) {
        RemotingCommand response = responseFuture.getResponseCommand();
        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);    6.6.5
  }
   
});
 
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);               6.6.4、
 
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {            //netty相关
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        if (f.isSuccess()) {
            responseFuture.setSendRequestOK(true);
            return;
        } else {
            responseFuture.setSendRequestOK(false);
        }
});

 6.6.1)构造request的code和body(实际发送的消息内容)【Byte类型】

                6.6.2)调用SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request)  方法

                6.6.3)进而调用RemotingClient.invokeSync(addr, request, timeoutMillis);

                6.6.4)进而调用NettyRemotingClient.invokeSyncImpl(channel, request, timeoutMillis)给broker发送消息,实际是netty间的通信channel.writeAndFlush(request),并且在超时时间内等待返回结果

                6.6.5)根据服务端的响应结果,调用processSendResponse(brokerName, msg, response);组装SendResult。

        最后将SendResult层层返回,就回到了最初的样子  SendResult sendResult = producer.send(msg);

        其中有一个问题还没理解,使用Netty传输POJO对象,重点在于对象的序列化。序列化的对象通过TCP进行网络传输,结合Netty提供的对象编解码器,可以做到远程传输对象。首先Java需要序列化的对象,需要实现java.io.Serializable接口。可以看到Message对象是实现该接口了的。但是发送消息时实际发送的对象是RemotingCommand,但是他并没有序列化。这是为什么呢?


最后简单总结一下producer发送消息的过程(以sync为例):检查producer状态,检查参数,获取topic的路由信息,选择发送队列,获取broker的地址,配置request请求头,给request添加code值和body的内容,使用netty将从NettyRemotingClient【netty服客户端】消息传输到broker(NettyRemotingServer【netty服务端】),等待响应结果,组装SendResult 并返回。大致如下流程:

rocketMq-Producer原理源码分析