欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ架构 - 同步发送

程序员文章站 2022-07-15 09:23:08
...

前言

围绕 defaultMQProducer.send(message) 这行代码展开分析。

源码解析

经过下面的一系列代码分析,RocketMQ发送消息的底层借助Netty的Channel#writeAndFlush(…)方法实现。
RocketMQ架构 - 同步发送
RocketMQ架构 - 同步发送
通过defaultMQProducer.setSendMsgTimeout(int timeout)方法改变默认的发送消息的超时时间。默认是3秒。

RocketMQ架构 - 同步发送
通过第二个参数CommunicationMode.SYNC,传递同步的通信方式。

RocketMQ架构 - 同步发送

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	/* 确保ServiceState是running状态 */
    this.makeSureStateOK();
    /* 要求消息体不为空、消息大小不超过消息最大的大小(默认是4MB)*/
    /* 要求topic不为空,长度不能>=255,不能是TBW102,不能包含非法字符 */
    Validators.checkMessage(msg, this.defaultMQProducer);
	.....
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
     /* ok() : TopicPublishInfo#ArrayList<MessageQueue>属性有元素 */
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
		.....
		/* 同步发送的生产者的重试次数为1 */
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
       
            /* MQFaultStrategy选取一个MessageQueue */
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, null);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
               		......
               		/* 发送消息的核心方法 */
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    ......
                    switch (communicationMode) {
                    .....
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            	/* Indicate whether to retry another broker on sending failure internally. */
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                	/* 重试 */
                                    continue;
                                }
                            }
                            return sendResult;
                    }
            ......
            } else {
                break;
            }
        }
        if (sendResult != null) {
            return sendResult;
        }

		/* 对NameServer地址列表进行检查,要求不为空,有内容 */
		/* MQClientInstance -> MQClientAPIImpl -> NettyRemotingClient#List<String> nameServerAddressList */
		List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw 。。。。。。
        }
        
        throw 。。。。。。      

RocketMQ架构 - 同步发送

private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    。。。。。。
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
     	/* 如果vipChannelEnabled设置为true,broker的端口号减2 */
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {
            	/* UNIQ_KEY - 唯一的id */
                MessageClientIDSetter.setUniqID(msg);
            }

            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            /* 对于非MessagePatch类型的消息,如果大小超过了4KB,会尝试压缩消息, */
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }

			/* 判断是否是事务消息 */
			/* TRAN_MSG属性值 */
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }

			/* ArrayList<CheckForbiddenHook>不为空,有内容 */
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.set。。。。。。
            }

			/* ArrayList<SendMessageHook>属性不为空,有内容 */
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.set。。。。。。
                
                /* 获取TRAN_MSG属性值 */
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                。。。。。。
            }

			/* 构建请求头 */
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.set。。。。。。
            
            /* %RETRY% */
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            	/* 从Message中获取RECONSUME_TIME属性值 */
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                	/* 更新消费次数 */
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
				/* 从Message中获取MAX_RECONSUME_TIMES属性值 */
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                	/* 更新最大消费次数 */
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            SendResult sendResult = null;
            switch (communicationMode) {
                ......
                case SYNC:
                    ......
                    /* 发送消息的核心方法 */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;             
            }
			/* ArrayList<SendMessageHook>属性不为空,有元素 */
            if (this.hasSendMessageHook()) {
            	/* SendMessageContext */
                context.setSendResult(sendResult);
                /* 日志记录 */
                this.executeSendMessageHookAfter(context);
            }
            return sendResult;
        .....
        } finally {
            msg.setBody(prevBody);
        }
    }
    throw ......
}

RocketMQ架构 - 同步发送

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
	......
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            .....
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
       ......
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        /* !!!RocketMQ发送消息的核心方法 */
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
相关标签: # RocketMQ