RocketMQ架构 - 同步发送
程序员文章站
2022-07-15 09:23:08
...
前言
围绕 defaultMQProducer.send(message) 这行代码展开分析。
源码解析
经过下面的一系列代码分析,RocketMQ发送消息的底层借助Netty的Channel#writeAndFlush(…)方法实现。
通过defaultMQProducer.setSendMsgTimeout(int timeout)方法改变默认的发送消息的超时时间。默认是3秒。
通过第二个参数CommunicationMode.SYNC,传递同步的通信方式。
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
/* 确保ServiceState是running状态 */
this.makeSureStateOK();
/* 要求消息体不为空、消息大小不超过消息最大的大小(默认是4MB)*/
/* 要求topic不为空,长度不能>=255,不能是TBW102,不能包含非法字符 */
Validators.checkMessage(msg, this.defaultMQProducer);
.....
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
/* ok() : TopicPublishInfo#ArrayList<MessageQueue>属性有元素 */
if (topicPublishInfo != null && topicPublishInfo.ok()) {
.....
/* 同步发送的生产者的重试次数为1 */
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
/* MQFaultStrategy选取一个MessageQueue */
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, null);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
......
/* 发送消息的核心方法 */
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
......
switch (communicationMode) {
.....
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
/* Indicate whether to retry another broker on sending failure internally. */
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
/* 重试 */
continue;
}
}
return sendResult;
}
......
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
/* 对NameServer地址列表进行检查,要求不为空,有内容 */
/* MQClientInstance -> MQClientAPIImpl -> NettyRemotingClient#List<String> nameServerAddressList */
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw 。。。。。。
}
throw 。。。。。。
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
。。。。。。
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
/* 如果vipChannelEnabled设置为true,broker的端口号减2 */
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
/* UNIQ_KEY - 唯一的id */
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
/* 对于非MessagePatch类型的消息,如果大小超过了4KB,会尝试压缩消息, */
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
/* 判断是否是事务消息 */
/* TRAN_MSG属性值 */
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
/* ArrayList<CheckForbiddenHook>不为空,有内容 */
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.set。。。。。。
}
/* ArrayList<SendMessageHook>属性不为空,有内容 */
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.set。。。。。。
/* 获取TRAN_MSG属性值 */
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
。。。。。。
}
/* 构建请求头 */
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.set。。。。。。
/* %RETRY% */
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
/* 从Message中获取RECONSUME_TIME属性值 */
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
/* 更新消费次数 */
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
/* 从Message中获取MAX_RECONSUME_TIMES属性值 */
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
/* 更新最大消费次数 */
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
......
case SYNC:
......
/* 发送消息的核心方法 */
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
}
/* ArrayList<SendMessageHook>属性不为空,有元素 */
if (this.hasSendMessageHook()) {
/* SendMessageContext */
context.setSendResult(sendResult);
/* 日志记录 */
this.executeSendMessageHookAfter(context);
}
return sendResult;
.....
} finally {
msg.setBody(prevBody);
}
}
throw ......
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
......
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
.....
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
......
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
/* !!!RocketMQ发送消息的核心方法 */
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}