RocketMQ中Producer消息的发送
上篇博客介绍过producer的启动,这里涉及到相关内容就不再累赘了 【rocketmq中producer的启动源码分析】
producer发送消息,首先需要生成message实例:
1 public class message implements serializable { 2 private static final long serialversionuid = 8445773977080406428l; 3 4 private string topic; 5 private int flag; 6 private map<string, string> properties; 7 private byte[] body; 8 private string transactionid; 9 10 public message() {} 11 12 public message(string topic, byte[] body) { 13 this(topic, "", "", 0, body, true); 14 } 15 16 public message(string topic, string tags, byte[] body) { 17 this(topic, tags, "", 0, body, true); 18 } 19 20 public message(string topic, string tags, string keys, byte[] body) { 21 this(topic, tags, keys, 0, body, true); 22 } 23 24 public message(string topic, string tags, string keys, int flag, byte[] body, boolean waitstoremsgok) { 25 this.topic = topic; 26 this.flag = flag; 27 this.body = body; 28 29 if (tags != null && tags.length() > 0) 30 this.settags(tags); 31 32 if (keys != null && keys.length() > 0) 33 this.setkeys(keys); 34 35 this.setwaitstoremsgok(waitstoremsgok); 36 } 37 38 public void settags(string tags) { 39 this.putproperty(messageconst.property_tags, tags); 40 } 41 42 public void setkeys(string keys) { 43 this.putproperty(messageconst.property_keys, keys); 44 } 45 46 public void setwaitstoremsgok(boolean waitstoremsgok) { 47 this.putproperty(messageconst.property_wait_store_msg_ok, boolean.tostring(waitstoremsgok)); 48 } 49 50 void putproperty(final string name, final string value) { 51 if (null == this.properties) { 52 this.properties = new hashmap<string, string>(); 53 } 54 55 this.properties.put(name, value); 56 } 57 58 public void putuserproperty(final string name, final string value) { 59 if (messageconst.string_hash_set.contains(name)) { 60 throw new runtimeexception(string.format( 61 "the property<%s> is used by system, input another please", name)); 62 } 63 64 if (value == null || value.trim().isempty() 65 || name == null || name.trim().isempty()) { 66 throw new illegalargumentexception( 67 "the name or value of property can not be null or blank string!" 68 ); 69 } 70 71 this.putproperty(name, value); 72 } 73 74 }
其中properties中存放需要配置的属性,由messageconst规定其key:
1 public class messageconst { 2 public static final string property_keys = "keys"; 3 public static final string property_tags = "tags"; 4 public static final string property_wait_store_msg_ok = "wait"; 5 public static final string property_delay_time_level = "delay"; 6 public static final string property_retry_topic = "retry_topic"; 7 public static final string property_real_topic = "real_topic"; 8 public static final string property_real_queue_id = "real_qid"; 9 public static final string property_transaction_prepared = "tran_msg"; 10 public static final string property_producer_group = "pgroup"; 11 public static final string property_min_offset = "min_offset"; 12 public static final string property_max_offset = "max_offset"; 13 public static final string property_buyer_id = "buyer_id"; 14 public static final string property_origin_message_id = "origin_message_id"; 15 public static final string property_transfer_flag = "transfer_flag"; 16 public static final string property_correction_flag = "correction_flag"; 17 public static final string property_mq2_flag = "mq2_flag"; 18 public static final string property_reconsume_time = "reconsume_time"; 19 public static final string property_msg_region = "msg_region"; 20 public static final string property_trace_switch = "trace_on"; 21 public static final string property_uniq_client_message_id_keyidx = "uniq_key"; 22 public static final string property_max_reconsume_times = "max_reconsume_times"; 23 public static final string property_consume_start_timestamp = "consume_start_time"; 24 public static final string property_transaction_prepared_queue_offset = "tran_prepared_queue_offset"; 25 public static final string property_transaction_check_times = "transaction_check_times"; 26 public static final string property_check_immunity_time_in_seconds = "check_immunity_time_in_seconds"; 27 }
在创建完message后,通过defaultmqproducer的send方法对消息进行发送
producer支持三种模式的消息发送,由communicationmode枚举规定:
1 public enum communicationmode { 2 sync, 3 async, 4 oneway, 5 }
分别代表:同步、异步以及单向发送
其中同步和异步是根据不同参数类型的send方法来决定的
只要send方法中带有sendcallback参数,都代表着异步发送,否则就是同步,sendcallback提供了异步发送的回滚事件响应:
1 public interface sendcallback { 2 void onsuccess(final sendresult sendresult); 3 4 void onexception(final throwable e); 5 }
而单向发送需要使用sendoneway方法
无论使用哪种方式,最后都是通过调用defaultmqproducer包装的defaultmqproducerimpl的senddefaultimpl方法
defaultmqproducerimpl的senddefaultimpl方法:
1 private sendresult senddefaultimpl( 2 message msg, 3 final communicationmode communicationmode, 4 final sendcallback sendcallback, 5 final long timeout 6 ) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception { 7 this.makesurestateok(); 8 validators.checkmessage(msg, this.defaultmqproducer); 9 10 final long invokeid = random.nextlong(); 11 long begintimestampfirst = system.currenttimemillis(); 12 long begintimestampprev = begintimestampfirst; 13 long endtimestamp = begintimestampfirst; 14 topicpublishinfo topicpublishinfo = this.trytofindtopicpublishinfo(msg.gettopic()); 15 if (topicpublishinfo != null && topicpublishinfo.ok()) { 16 boolean calltimeout = false; 17 messagequeue mq = null; 18 exception exception = null; 19 sendresult sendresult = null; 20 int timestotal = communicationmode == communicationmode.sync ? 1 + this.defaultmqproducer.getretrytimeswhensendfailed() : 1; 21 int times = 0; 22 string[] brokerssent = new string[timestotal]; 23 for (; times < timestotal; times++) { 24 string lastbrokername = null == mq ? null : mq.getbrokername(); 25 messagequeue mqselected = this.selectonemessagequeue(topicpublishinfo, lastbrokername); 26 if (mqselected != null) { 27 mq = mqselected; 28 brokerssent[times] = mq.getbrokername(); 29 try { 30 begintimestampprev = system.currenttimemillis(); 31 long costtime = begintimestampprev - begintimestampfirst; 32 if (timeout < costtime) { 33 calltimeout = true; 34 break; 35 } 36 37 sendresult = this.sendkernelimpl(msg, mq, communicationmode, sendcallback, topicpublishinfo, timeout - costtime); 38 endtimestamp = system.currenttimemillis(); 39 this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, false); 40 switch (communicationmode) { 41 case async: 42 return null; 43 case oneway: 44 return null; 45 case sync: 46 if (sendresult.getsendstatus() != sendstatus.send_ok) { 47 if (this.defaultmqproducer.isretryanotherbrokerwhennotstoreok()) { 48 continue; 49 } 50 } 51 52 return sendresult; 53 default: 54 break; 55 } 56 } catch (remotingexception e) { 57 endtimestamp = system.currenttimemillis(); 58 this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, true); 59 log.warn(string.format("sendkernelimpl exception, resend at once, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e); 60 log.warn(msg.tostring()); 61 exception = e; 62 continue; 63 } catch (mqclientexception e) { 64 endtimestamp = system.currenttimemillis(); 65 this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, true); 66 log.warn(string.format("sendkernelimpl exception, resend at once, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e); 67 log.warn(msg.tostring()); 68 exception = e; 69 continue; 70 } catch (mqbrokerexception e) { 71 endtimestamp = system.currenttimemillis(); 72 this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, true); 73 log.warn(string.format("sendkernelimpl exception, resend at once, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e); 74 log.warn(msg.tostring()); 75 exception = e; 76 switch (e.getresponsecode()) { 77 case responsecode.topic_not_exist: 78 case responsecode.service_not_available: 79 case responsecode.system_error: 80 case responsecode.no_permission: 81 case responsecode.no_buyer_id: 82 case responsecode.not_in_current_unit: 83 continue; 84 default: 85 if (sendresult != null) { 86 return sendresult; 87 } 88 89 throw e; 90 } 91 } catch (interruptedexception e) { 92 endtimestamp = system.currenttimemillis(); 93 this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, false); 94 log.warn(string.format("sendkernelimpl exception, throw exception, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e); 95 log.warn(msg.tostring()); 96 97 log.warn("sendkernelimpl exception", e); 98 log.warn(msg.tostring()); 99 throw e; 100 } 101 } else { 102 break; 103 } 104 } 105 106 if (sendresult != null) { 107 return sendresult; 108 } 109 110 string info = string.format("send [%d] times, still failed, cost [%d]ms, topic: %s, brokerssent: %s", 111 times, 112 system.currenttimemillis() - begintimestampfirst, 113 msg.gettopic(), 114 arrays.tostring(brokerssent)); 115 116 info += faqurl.suggesttodo(faqurl.send_msg_failed); 117 118 mqclientexception mqclientexception = new mqclientexception(info, exception); 119 if (calltimeout) { 120 throw new remotingtoomuchrequestexception("senddefaultimpl call timeout"); 121 } 122 123 if (exception instanceof mqbrokerexception) { 124 mqclientexception.setresponsecode(((mqbrokerexception) exception).getresponsecode()); 125 } else if (exception instanceof remotingconnectexception) { 126 mqclientexception.setresponsecode(clienterrorcode.connect_broker_exception); 127 } else if (exception instanceof remotingtimeoutexception) { 128 mqclientexception.setresponsecode(clienterrorcode.access_broker_timeout); 129 } else if (exception instanceof mqclientexception) { 130 mqclientexception.setresponsecode(clienterrorcode.broker_not_exist_exception); 131 } 132 133 throw mqclientexception; 134 } 135 136 list<string> nslist = this.getmqclientfactory().getmqclientapiimpl().getnameserveraddresslist(); 137 if (null == nslist || nslist.isempty()) { 138 throw new mqclientexception( 139 "no name server address, please set it." + faqurl.suggesttodo(faqurl.name_server_addr_not_exist_url), null).setresponsecode(clienterrorcode.no_name_server_exception); 140 } 141 142 throw new mqclientexception("no route info of this topic, " + msg.gettopic() + faqurl.suggesttodo(faqurl.no_topic_route_info), 143 null).setresponsecode(clienterrorcode.not_found_topic_exception); 144 }
其中communicationmode参数会根据调用的api进行如上所说进行发送类型的设置
而sendcallback参数,只有当使用异步发送的api时才不是null
首先调用makesurestateok方法,检查producer是否启动:
1 private void makesurestateok() throws mqclientexception { 2 if (this.servicestate != servicestate.running) { 3 throw new mqclientexception("the producer service state not ok, " 4 + this.servicestate 5 + faqurl.suggesttodo(faqurl.client_service_not_ok), 6 null); 7 } 8 }
servicestate 在上一篇博客中介绍过了
在检查完producer的状态后,还需要通过validators的checktopic方法验证message的合法性:
1 public static void checktopic(string topic) throws mqclientexception { 2 if (utilall.isblank(topic)) { 3 throw new mqclientexception("the specified topic is blank", null); 4 } 5 6 if (!regularexpressionmatcher(topic, pattern)) { 7 throw new mqclientexception(string.format( 8 "the specified topic[%s] contains illegal characters, allowing only %s", topic, 9 valid_pattern_str), null); 10 } 11 12 if (topic.length() > character_max_length) { 13 throw new mqclientexception("the specified topic is longer than topic max length 255.", null); 14 } 15 16 //whether the same with system reserved keyword 17 if (topic.equals(mixall.auto_create_topic_key_topic)) { 18 throw new mqclientexception( 19 string.format("the topic[%s] is conflict with auto_create_topic_key_topic.", topic), null); 20 } 21 }
验证完毕后,记录开始时间戳,预示着发送的真正开始
接着调用trytofindtopicpublishinfo,根据topic获取路由信息
trytofindtopicpublishinfo方法:
1 private topicpublishinfo trytofindtopicpublishinfo(final string topic) { 2 topicpublishinfo topicpublishinfo = this.topicpublishinfotable.get(topic); 3 if (null == topicpublishinfo || !topicpublishinfo.ok()) { 4 this.topicpublishinfotable.putifabsent(topic, new topicpublishinfo()); 5 this.mqclientfactory.updatetopicrouteinfofromnameserver(topic); 6 topicpublishinfo = this.topicpublishinfotable.get(topic); 7 } 8 9 if (topicpublishinfo.ishavetopicrouterinfo() || topicpublishinfo.ok()) { 10 return topicpublishinfo; 11 } else { 12 this.mqclientfactory.updatetopicrouteinfofromnameserver(topic, true, this.defaultmqproducer); 13 topicpublishinfo = this.topicpublishinfotable.get(topic); 14 return topicpublishinfo; 15 } 16 }
在producer启动中已经介绍过了topicpublishinfotable,是一张记录有关topic的路由信息的map,先尝试获取是否有存在的topicpublishinfo
若是不存在,或者消息队列不可用(ok不成立):
1 public boolean ok() { 2 return null != this.messagequeuelist && !this.messagequeuelist.isempty(); 3 }
ok用来验证该路由上的消息队列是否可用
需要创建一个新的topicpublishinfo放在map中,然后调用updatetopicrouteinfofromnameserver来更新路由信息,updatetopicrouteinfofromnameserver在上一篇说过,在定时任务中会使用,这里就是为了及时更新
若是存在,且有路由信息消息队列可用,则直接返回topicpublishinfo
否则还需要调用updatetopicrouteinfofromnameserver来进行一次更新
回到senddefaultimpl,在取得到路由信息后,现设置calltimeout超时响应为false,用于处理发送超时
接着根据发送方式communicationmode,计算如果发送失败,允许重发的次数,这里是针对同步发送,默认1+2共三次,其他两种模式只允许发送一次
根据发送次数,创建一个记录brokername的数组,再由发送次数进行for循环
首先根据topicpublishinfo和lastbrokername调用selectonemessagequeue选取指定的消息队列,是由topicpublishinfo的selectonemessagequeue方法实现的:
1 public messagequeue selectonemessagequeue(final string lastbrokername) { 2 if (lastbrokername == null) { 3 return selectonemessagequeue(); 4 } else { 5 int index = this.sendwhichqueue.getandincrement(); 6 for (int i = 0; i < this.messagequeuelist.size(); i++) { 7 int pos = math.abs(index++) % this.messagequeuelist.size(); 8 if (pos < 0) 9 pos = 0; 10 messagequeue mq = this.messagequeuelist.get(pos); 11 if (!mq.getbrokername().equals(lastbrokername)) { 12 return mq; 13 } 14 } 15 return selectonemessagequeue(); 16 } 17 } 18 19 public messagequeue selectonemessagequeue() { 20 int index = this.sendwhichqueue.getandincrement(); 21 int pos = math.abs(index) % this.messagequeuelist.size(); 22 if (pos < 0) 23 pos = 0; 24 return this.messagequeuelist.get(pos); 25 }
当lastbrokername等于null,使用selectonemessagequeue的无参方法,其中sendwhichqueue我在上一篇介绍过,不同线程通过getandincrement获得到的index是一个随机值
根据这个index对messagequeuelist取余,来获取在list中的下标,根据这个下标在messagequeuelist中选取一个messagequeue
由于不同的messagequeue有不同的路由信息,所里在这里其实是为了负载均衡,保证每次发送能发送给不同的broker
若是lastbrokername不等于null,还是和上面相似,只不过当选取到了messagequeue时,要和lastbrokername比较,当不想同时,才返回,同样也是为了保证不向同一broker重复发送来保证负载均衡
回到senddefaultimpl,在选取完messagequeue后,记录brokername,在计算是否达到超时事件,当这些成功后需要调用sendkernelimpl来完成真正的发送:
sendkernelimpl方法:
1 private sendresult sendkernelimpl(final message msg, 2 final messagequeue mq, 3 final communicationmode communicationmode, 4 final sendcallback sendcallback, 5 final topicpublishinfo topicpublishinfo, 6 final long timeout) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception { 7 long beginstarttime = system.currenttimemillis(); 8 string brokeraddr = this.mqclientfactory.findbrokeraddressinpublish(mq.getbrokername()); 9 if (null == brokeraddr) { 10 trytofindtopicpublishinfo(mq.gettopic()); 11 brokeraddr = this.mqclientfactory.findbrokeraddressinpublish(mq.getbrokername()); 12 } 13 14 sendmessagecontext context = null; 15 if (brokeraddr != null) { 16 brokeraddr = mixall.brokervipchannel(this.defaultmqproducer.issendmessagewithvipchannel(), brokeraddr); 17 18 byte[] prevbody = msg.getbody(); 19 try { 20 //for messagebatch,id has been set in the generating process 21 if (!(msg instanceof messagebatch)) { 22 messageclientidsetter.setuniqid(msg); 23 } 24 25 int sysflag = 0; 26 boolean msgbodycompressed = false; 27 if (this.trytocompressmessage(msg)) { 28 sysflag |= messagesysflag.compressed_flag; 29 msgbodycompressed = true; 30 } 31 32 final string tranmsg = msg.getproperty(messageconst.property_transaction_prepared); 33 if (tranmsg != null && boolean.parseboolean(tranmsg)) { 34 sysflag |= messagesysflag.transaction_prepared_type; 35 } 36 37 if (hascheckforbiddenhook()) { 38 checkforbiddencontext checkforbiddencontext = new checkforbiddencontext(); 39 checkforbiddencontext.setnamesrvaddr(this.defaultmqproducer.getnamesrvaddr()); 40 checkforbiddencontext.setgroup(this.defaultmqproducer.getproducergroup()); 41 checkforbiddencontext.setcommunicationmode(communicationmode); 42 checkforbiddencontext.setbrokeraddr(brokeraddr); 43 checkforbiddencontext.setmessage(msg); 44 checkforbiddencontext.setmq(mq); 45 checkforbiddencontext.setunitmode(this.isunitmode()); 46 this.executecheckforbiddenhook(checkforbiddencontext); 47 } 48 49 if (this.hassendmessagehook()) { 50 context = new sendmessagecontext(); 51 context.setproducer(this); 52 context.setproducergroup(this.defaultmqproducer.getproducergroup()); 53 context.setcommunicationmode(communicationmode); 54 context.setbornhost(this.defaultmqproducer.getclientip()); 55 context.setbrokeraddr(brokeraddr); 56 context.setmessage(msg); 57 context.setmq(mq); 58 string istrans = msg.getproperty(messageconst.property_transaction_prepared); 59 if (istrans != null && istrans.equals("true")) { 60 context.setmsgtype(messagetype.trans_msg_half); 61 } 62 63 if (msg.getproperty("__startdelivertime") != null || msg.getproperty(messageconst.property_delay_time_level) != null) { 64 context.setmsgtype(messagetype.delay_msg); 65 } 66 this.executesendmessagehookbefore(context); 67 } 68 69 sendmessagerequestheader requestheader = new sendmessagerequestheader(); 70 requestheader.setproducergroup(this.defaultmqproducer.getproducergroup()); 71 requestheader.settopic(msg.gettopic()); 72 requestheader.setdefaulttopic(this.defaultmqproducer.getcreatetopickey()); 73 requestheader.setdefaulttopicqueuenums(this.defaultmqproducer.getdefaulttopicqueuenums()); 74 requestheader.setqueueid(mq.getqueueid()); 75 requestheader.setsysflag(sysflag); 76 requestheader.setborntimestamp(system.currenttimemillis()); 77 requestheader.setflag(msg.getflag()); 78 requestheader.setproperties(messagedecoder.messageproperties2string(msg.getproperties())); 79 requestheader.setreconsumetimes(0); 80 requestheader.setunitmode(this.isunitmode()); 81 requestheader.setbatch(msg instanceof messagebatch); 82 if (requestheader.gettopic().startswith(mixall.retry_group_topic_prefix)) { 83 string reconsumetimes = messageaccessor.getreconsumetime(msg); 84 if (reconsumetimes != null) { 85 requestheader.setreconsumetimes(integer.valueof(reconsumetimes)); 86 messageaccessor.clearproperty(msg, messageconst.property_reconsume_time); 87 } 88 89 string maxreconsumetimes = messageaccessor.getmaxreconsumetimes(msg); 90 if (maxreconsumetimes != null) { 91 requestheader.setmaxreconsumetimes(integer.valueof(maxreconsumetimes)); 92 messageaccessor.clearproperty(msg, messageconst.property_max_reconsume_times); 93 } 94 } 95 96 sendresult sendresult = null; 97 switch (communicationmode) { 98 case async: 99 message tmpmessage = msg; 100 if (msgbodycompressed) { 101 //if msg body was compressed, msgbody should be reset using prevbody. 102 //clone new message using commpressed message body and recover origin massage. 103 //fix bug:https://github.com/apache/rocketmq-externals/issues/66 104 tmpmessage = messageaccessor.clonemessage(msg); 105 msg.setbody(prevbody); 106 } 107 long costtimeasync = system.currenttimemillis() - beginstarttime; 108 if (timeout < costtimeasync) { 109 throw new remotingtoomuchrequestexception("sendkernelimpl call timeout"); 110 } 111 sendresult = this.mqclientfactory.getmqclientapiimpl().sendmessage( 112 brokeraddr, 113 mq.getbrokername(), 114 tmpmessage, 115 requestheader, 116 timeout - costtimeasync, 117 communicationmode, 118 sendcallback, 119 topicpublishinfo, 120 this.mqclientfactory, 121 this.defaultmqproducer.getretrytimeswhensendasyncfailed(), 122 context, 123 this); 124 break; 125 case oneway: 126 case sync: 127 long costtimesync = system.currenttimemillis() - beginstarttime; 128 if (timeout < costtimesync) { 129 throw new remotingtoomuchrequestexception("sendkernelimpl call timeout"); 130 } 131 sendresult = this.mqclientfactory.getmqclientapiimpl().sendmessage( 132 brokeraddr, 133 mq.getbrokername(), 134 msg, 135 requestheader, 136 timeout - costtimesync, 137 communicationmode, 138 context, 139 this); 140 break; 141 default: 142 assert false; 143 break; 144 } 145 146 if (this.hassendmessagehook()) { 147 context.setsendresult(sendresult); 148 this.executesendmessagehookafter(context); 149 } 150 151 return sendresult; 152 } catch (remotingexception e) { 153 if (this.hassendmessagehook()) { 154 context.setexception(e); 155 this.executesendmessagehookafter(context); 156 } 157 throw e; 158 } catch (mqbrokerexception e) { 159 if (this.hassendmessagehook()) { 160 context.setexception(e); 161 this.executesendmessagehookafter(context); 162 } 163 throw e; 164 } catch (interruptedexception e) { 165 if (this.hassendmessagehook()) { 166 context.setexception(e); 167 this.executesendmessagehookafter(context); 168 } 169 throw e; 170 } finally { 171 msg.setbody(prevbody); 172 } 173 } 174 175 throw new mqclientexception("the broker[" + mq.getbrokername() + "] not exist", null); 176 }
先记录开始时间beginstarttime,为可能的超时做准备
然后根据brokername来获取对应的broker地址
findbrokeraddressinpublish方法:
1 public string findbrokeraddressinpublish(final string brokername) { 2 hashmap<long/* brokerid */, string/* address */> map = this.brokeraddrtable.get(brokername); 3 if (map != null && !map.isempty()) { 4 return map.get(mixall.master_id); 5 } 6 7 return null; 8 }
根据brokername在brokeraddrtable表中进行查找
若是没有找到还是通过trytofindtopicpublishinfo来进行更新,然后再通过findbrokeraddressinpublish重新查找
再往后,如果设置了vip(高优先级队列)通道,那么这里将根据brokeraddr获取vip通道的的地址:
mixall的brokervipchannel方法:
1 public static string brokervipchannel(final boolean ischange, final string brokeraddr) { 2 if (ischange) { 3 string[] ipandport = brokeraddr.split(":"); 4 string brokeraddrnew = ipandport[0] + ":" + (integer.parseint(ipandport[1]) - 2); 5 return brokeraddrnew; 6 } else { 7 return brokeraddr; 8 } 9 }
vip通道的地址计算很简单,只是将端口号减去2
在设置完后,就是一大堆的配置了
这里定义了一个sysflag的整型值,表示消息的类型,有如下取值:
1 public class messagesysflag { 2 public final static int compressed_flag = 0x1; 3 public final static int multi_tags_flag = 0x1 << 1; 4 public final static int transaction_not_type = 0; 5 public final static int transaction_prepared_type = 0x1 << 2; 6 public final static int transaction_commit_type = 0x2 << 2; 7 public final static int transaction_rollback_type = 0x3 << 2; 8 }
还定义了一个msgbodycompressed,表示消息是否经过压缩,trytocompressmessage判断并对消息进行压缩:
trytocompressmessage方法:
1 private boolean trytocompressmessage(final message msg) { 2 if (msg instanceof messagebatch) { 3 //batch dose not support compressing right now 4 return false; 5 } 6 byte[] body = msg.getbody(); 7 if (body != null) { 8 if (body.length >= this.defaultmqproducer.getcompressmsgbodyoverhowmuch()) { 9 try { 10 byte[] data = utilall.compress(body, zipcompresslevel); 11 if (data != null) { 12 msg.setbody(data); 13 return true; 14 } 15 } catch (ioexception e) { 16 log.error("trytocompressmessage exception", e); 17 log.warn(msg.tostring()); 18 } 19 } 20 } 21 22 return false; 23 }
当消息大小大于等于compressmsgbodyoverhowmuch(默认4m)时,使用utilall的compress消息进行压缩处理:
1 public static byte[] compress(final byte[] src, final int level) throws ioexception { 2 byte[] result = src; 3 bytearrayoutputstream bytearrayoutputstream = new bytearrayoutputstream(src.length); 4 java.util.zip.deflater defeater = new java.util.zip.deflater(level); 5 deflateroutputstream deflateroutputstream = new deflateroutputstream(bytearrayoutputstream, defeater); 6 try { 7 deflateroutputstream.write(src); 8 deflateroutputstream.finish(); 9 deflateroutputstream.close(); 10 result = bytearrayoutputstream.tobytearray(); 11 } catch (ioexception e) { 12 defeater.end(); 13 throw e; 14 } finally { 15 try { 16 bytearrayoutputstream.close(); 17 } catch (ioexception ignored) { 18 } 19 20 defeater.end(); 21 } 22 23 return result; 24 }
这里采用zip的方式进行消息压缩
接下来,根据消息是否是事务消息来选择设置sysflag,关于事务消息在后面博客再说
接下来检查是否设置了checkforbiddenhook,若是设置了需要遍历所有的checkforbiddenhook,执行其 checkforbidden方法,来完成禁发
同理检查是否设置了sendmessagehook,遍历所有的sendmessagehook,执行其sendmessagebefore方法,在消息发送完毕后,会执行其sendmessageafter方法
接着会对请求头requestheader进行一大堆设置,做完这些后,进入switch块,根据不同的发送方式做了相应检查
最后无论是哪种发送方式,都会调用mqclientapiimpl的sendmessage方法:
1 public sendresult sendmessage( 2 final string addr, 3 final string brokername, 4 final message msg, 5 final sendmessagerequestheader requestheader, 6 final long timeoutmillis, 7 final communicationmode communicationmode, 8 final sendcallback sendcallback, 9 final topicpublishinfo topicpublishinfo, 10 final mqclientinstance instance, 11 final int retrytimeswhensendfailed, 12 final sendmessagecontext context, 13 final defaultmqproducerimpl producer 14 ) throws remotingexception, mqbrokerexception, interruptedexception { 15 long beginstarttime = system.currenttimemillis(); 16 remotingcommand request = null; 17 if (sendsmartmsg || msg instanceof messagebatch) { 18 sendmessagerequestheaderv2 requestheaderv2 = sendmessagerequestheaderv2.createsendmessagerequestheaderv2(requestheader); 19 request = remotingcommand.createrequestcommand(msg instanceof messagebatch ? requestcode.send_batch_message : requestcode.send_message_v2, requestheaderv2); 20 } else { 21 request = remotingcommand.createrequestcommand(requestcode.send_message, requestheader); 22 } 23 24 request.setbody(msg.getbody()); 25 26 switch (communicationmode) { 27 case oneway: 28 this.remotingclient.invokeoneway(addr, request, timeoutmillis); 29 return null; 30 case async: 31 final atomicinteger times = new atomicinteger(); 32 long costtimeasync = system.currenttimemillis() - beginstarttime; 33 if (timeoutmillis < costtimeasync) { 34 throw new remotingtoomuchrequestexception("sendmessage call timeout"); 35 } 36 this.sendmessageasync(addr, brokername, msg, timeoutmillis - costtimeasync, request, sendcallback, topicpublishinfo, instance, 37 retrytimeswhensendfailed, times, context, producer); 38 return null; 39 case sync: 40 long costtimesync = system.currenttimemillis() - beginstarttime; 41 if (timeoutmillis < costtimesync) { 42 throw new remotingtoomuchrequestexception("sendmessage call timeout"); 43 } 44 return this.sendmessagesync(addr, brokername, msg, timeoutmillis - costtimesync, request); 45 default: 46 assert false; 47 break; 48 } 49 50 return null; 51 }
首先会根据消息的类型,设置不同类型的请求remotingcommand
在完成请求的封装后,还是根据发送方式来执行
oneway方式:
会直接调用remotingclient即netty客户端的invokeoneway方法:
1 public void invokeoneway(string addr, remotingcommand request, long timeoutmillis) throws interruptedexception, 2 remotingconnectexception, remotingtoomuchrequestexception, remotingtimeoutexception, remotingsendrequestexception { 3 final channel channel = this.getandcreatechannel(addr); 4 if (channel != null && channel.isactive()) { 5 try { 6 dobeforerpchooks(addr, request); 7 this.invokeonewayimpl(channel, request, timeoutmillis); 8 } catch (remotingsendrequestexception e) { 9 log.warn("invokeoneway: send request exception, so close the channel[{}]", addr); 10 this.closechannel(addr, channel); 11 throw e; 12 } 13 } else { 14 this.closechannel(addr, channel); 15 throw new remotingconnectexception(addr); 16 } 17 }
首先根据broker的地址在channeltables中选取一个channel(上一篇博客介绍过在netty客户端会缓存一张建立好连接的channel的map即channeltables)
然后和前面相似,执行所有配置了的rpchook的dobeforerequest方法
之后执行invokeonewayimpl方法:
1 public void invokeonewayimpl(final channel channel, final remotingcommand request, final long timeoutmillis) 2 throws interruptedexception, remotingtoomuchrequestexception, remotingtimeoutexception, remotingsendrequestexception { 3 request.markonewayrpc(); 4 boolean acquired = this.semaphoreoneway.tryacquire(timeoutmillis, timeunit.milliseconds); 5 if (acquired) { 6 final semaphorereleaseonlyonce once = new semaphorereleaseonlyonce(this.semaphoreoneway); 7 try { 8 channel.writeandflush(request).addlistener(new channelfuturelistener() { 9 @override 10 public void operationcomplete(channelfuture f) throws exception { 11 once.release(); 12 if (!f.issuccess()) { 13 log.warn("send a request command to channel <" + channel.remoteaddress() + "> failed."); 14 } 15 } 16 }); 17 } catch (exception e) { 18 once.release(); 19 log.warn("write send a request command to channel <" + channel.remoteaddress() + "> failed."); 20 throw new remotingsendrequestexception(remotinghelper.parsechannelremoteaddr(channel), e); 21 } 22 } else { 23 if (timeoutmillis <= 0) { 24 throw new remotingtoomuchrequestexception("invokeonewayimpl invoke too fast"); 25 } else { 26 string info = string.format( 27 "invokeonewayimpl tryacquire semaphore timeout, %dms, waiting thread nums: %d semaphoreasyncvalue: %d", 28 timeoutmillis, 29 this.semaphoreoneway.getqueuelength(), 30 this.semaphoreoneway.availablepermits() 31 ); 32 log.warn(info); 33 throw new remotingtimeoutexception(info); 34 } 35 } 36 }
首先对request的标志位进行设置:
1 public void markonewayrpc() { 2 int bits = 1 << rpc_oneway; 3 this.flag |= bits; 4 }
接着会使用一个信号量semaphorereleaseonlyonce,会保证该信号量被释放一次
最后调用netty的writeandflush方法,进行request的发送,同时设置了异步监听,用于成功后信号量的释放
由于是单向发送,发送完成后并没有过多的处理
async方式:
调用sendmessageasync方法:
1 private void sendmessageasync( 2 final string addr, 3 final string brokername, 4 final message msg, 5 final long timeoutmillis, 6 final remotingcommand request, 7 final sendcallback sendcallback, 8 final topicpublishinfo topicpublishinfo, 9 final mqclientinstance instance, 10 final int retrytimeswhensendfailed, 11 final atomicinteger times, 12 final sendmessagecontext context, 13 final defaultmqproducerimpl producer 14 ) throws interruptedexception, remotingexception { 15 this.remotingclient.invokeasync(addr, request, timeoutmillis, new invokecallback() { 16 @override 17 public void operationcomplete(responsefuture responsefuture) { 18 remotingcommand response = responsefuture.getresponsecommand(); 19 if (null == sendcallback && response != null) { 20 21 try { 22 sendresult sendresult = mqclientapiimpl.this.processsendresponse(brokername, msg, response); 23 if (context != null && sendresult != null) { 24 context.setsendresult(sendresult); 25 context.getproducer().executesendmessagehookafter(context); 26 } 27 } catch (throwable e) { 28 } 29 30 producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), false); 31 return; 32 } 33 34 if (response != null) { 35 try { 36 sendresult sendresult = mqclientapiimpl.this.processsendresponse(brokername, msg, response); 37 assert sendresult != null; 38 if (context != null) { 39 context.setsendresult(sendresult); 40 context.getproducer().executesendmessagehookafter(context); 41 } 42 43 try { 44 sendcallback.onsuccess(sendresult); 45 } catch (throwable e) { 46 } 47 48 producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), false); 49 } catch (exception e) { 50 producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), true); 51 onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance, 52 retrytimeswhensendfailed, times, e, context, false, producer); 53 } 54 } else { 55 producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), true); 56 if (!responsefuture.issendrequestok()) { 57 mqclientexception ex = new mqclientexception("send request failed", responsefuture.getcause()); 58 onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance, 59 retrytimeswhensendfailed, times, ex, context, true, producer); 60 } else if (responsefuture.istimeout()) { 61 mqclientexception ex = new mqclientexception("wait response timeout " + responsefuture.gettimeoutmillis() + "ms", 62 responsefuture.getcause()); 63 onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance, 64 retrytimeswhensendfailed, times, ex, context, true, producer); 65 } else { 66 mqclientexception ex = new mqclientexception("unknow reseaon", responsefuture.getcause()); 67 onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance, 68 retrytimeswhensendfailed, times, ex, context, true, producer); 69 } 70 } 71 } 72 }); 73 }
在这里设置了一个invokecallback,用于处理发送之后的回调
先看到invokeasync方法:
1 public void invokeasync(string addr, remotingcommand request, long timeoutmillis, invokecallback invokecallback) 2 throws interruptedexception, remotingconnectexception, remotingtoomuchrequestexception, remotingtimeoutexception, 3 remotingsendrequestexception { 4 long beginstarttime = system.currenttimemillis(); 5 final channel channel = this.getandcreatechannel(addr); 6 if (channel != null && channel.isactive()) { 7 try { 8 dobeforerpchooks(addr, request); 9 long costtime = system.currenttimemillis() - beginstarttime; 10 if (timeoutmillis < costtime) { 11 throw new remotingtoomuchrequestexception("invokeasync call timeout"); 12 } 13 this.invokeasyncimpl(channel, request, timeoutmillis - costtime, invokecallback); 14 } catch (remotingsendrequestexception e) { 15 log.warn("invokeasync: send request exception, so close the channel[{}]", addr); 16 this.closechannel(addr, channel); 17 throw e; 18 } 19 } else { 20 this.closechannel(addr, channel); 21 throw new remotingconnectexception(addr); 22 } 23 }
和前面oneway类似,其具体实现是invokeasyncimpl
invokeasyncimpl方法:
1 public void invokeasyncimpl(final channel channel, final remotingcommand request, final long timeoutmillis, 2 final invokecallback invokecallback) 3 throws interruptedexception, remotingtoomuchrequestexception, remotingtimeoutexception, remotingsendrequestexception { 4 long beginstarttime = system.currenttimemillis(); 5 final int opaque = request.getopaque(); 6 boolean acquired = this.semaphoreasync.tryacquire(timeoutmillis, timeunit.milliseconds); 7 if (acquired) { 8 final semaphorereleaseonlyonce once = new semaphorereleaseonlyonce(this.semaphoreasync); 9 long costtime = system.currenttimemillis() - beginstarttime; 10 if (timeoutmillis < costtime) { 11 once.release(); 12 throw new remotingtimeoutexception("invokeasyncimpl call timeout"); 13 } 14 15 final responsefuture responsefuture = new responsefuture(channel, opaque, timeoutmillis - costtime, invokecallback, once); 16 this.responsetable.put(opaque, responsefuture); 17 try { 18 channel.writeandflush(request).addlistener(new channelfuturelistener() { 19 @override 20 public void operationcomplete(channelfuture f) throws exception { 21 if (f.issuccess()) { 22 responsefuture.setsendrequestok(true); 23 return; 24 } 25 requestfail(opaque); 26 log.warn("send a request command to channel <{}> failed.", remotinghelper.parsechannelremoteaddr(channel)); 27 } 28 }); 29 } catch (exception e) { 30 responsefuture.release(); 31 log.warn("send a request command to channel <" + remotinghelper.parsechannelremoteaddr(channel) + "> exception", e); 32 throw new remotingsendrequestexception(remotinghelper.parsechannelremoteaddr(channel), e); 33 } 34 } else { 35 if (timeoutmillis <= 0) { 36 throw new remotingtoomuchrequestexception("invokeasyncimpl invoke too fast"); 37 } else { 38 string info = 39 string.format("invokeasyncimpl tryacquire semaphore timeout, %dms, waiting thread nums: %d semaphoreasyncvalue: %d", 40 timeoutmillis, 41 this.semaphoreasync.getqueuelength(), 42 this.semaphoreasync.availablepermits() 43 ); 44 log.warn(info); 45 throw new remotingtimeoutexception(info); 46 } 47 } 48 }
这里会通过request的getopaque方法获取一个opaque值,这个值在request创建时就会被赋值,是一个自增的atomicinteger,也就是每个request的唯一id
之后会创建一个responsefuture封装invokecallback及channel,并将其放入responsetable中
responsetable是一个map:
1 protected final concurrentmap<integer /* opaque */, responsefuture> responsetable = 2 new concurrenthashmap<integer, responsefuture>(256);
其记录了requestid对应的responsefuture,用于管理异步发送后,对接收到响应的异步事件处理
也就是说当发送完毕,接收到响应消息,会通过requestid查找到对应的responsefuture,进而执行刚才设置的invokecallback中的方法,在invokecallback中,会执行processsendresponse方法,完成broker回送的响应消息的处理,最终根据情况会执行用户传入的sendcallback的onsuccess或者onexception方法,以此完成消息的异步发送
之后的步骤和oneway一样,由netty的writeandflush完成发送
sync方式:
调用sendmessagesync方法:
1 private sendresult sendmessagesync( 2 final string addr, 3 final string brokername, 4 final message msg, 5 final long timeoutmillis, 6 final remotingcommand request 7 ) throws remotingexception, mqbrokerexception, interruptedexception { 8 remotingcommand response = this.remotingclient.invokesync(addr, request, timeoutmillis); 9 assert response != null; 10 return this.processsendresponse(brokername, msg, response); 11 }
首先调用netty客户端的invokesync方法:
invokesync方法:
1 public remotingcommand invokesync(string addr, final remotingcommand request, long timeoutmillis) 2 throws interruptedexception, remotingconnectexception, remotingsendrequestexception, remotingtimeoutexception { 3 long beginstarttime = system.currenttimemillis(); 4 final channel channel = this.getandcreatechannel(addr); 5 if (channel != null && channel.isactive()) { 6 try { 7 dobeforerpchooks(addr, request); 8 long costtime = system.currenttimemillis() - beginstarttime; 9 if (timeoutmillis < costtime) { 10 throw new remotingtimeoutexception("invokesync call timeout"); 11 } 12 remotingcommand response = this.invokesyncimpl(channel, request, timeoutmillis - costtime); 13 doafterrpchooks(remotinghelper.parsechannelremoteaddr(channel), request, response); 14 return response; 15 } catch (remotingsendrequestexception e) { 16 log.warn("invokesync: send request exception, so close the channel[{}]", addr); 17 this.closechannel(addr, channel); 18 throw e; 19 } catch (remotingtimeoutexception e) { 20 if (nettyclientconfig.isclientclosesocketiftimeout()) { 21 this.closechannel(addr, channel); 22 log.warn("invokesync: close socket because of timeout, {}ms, {}", timeoutmillis, addr); 23 } 24 log.warn("invokesync: wait response timeout exception, the channel[{}]", addr); 25 throw e; 26 } 27 } else { 28 this.closechannel(addr, channel); 29 throw new remotingconnectexception(addr); 30 } 31 }
还是和前面类似的步骤
直接看到invokesyncimpl方法:
1 public remotingcommand invokesyncimpl(final channel channel, final remotingcommand request, 2 final long timeoutmillis) 3 throws interruptedexception, remotingsendrequestexception, remotingtimeoutexception { 4 final int opaque = request.getopaque(); 5 6 try { 7 final responsefuture responsefuture = new responsefuture(channel, opaque, timeoutmillis, null, null); 8 this.responsetable.put(opaque, responsefuture); 9 final socketaddress addr = channel.remoteaddress(); 10 channel.writeandflush(request).addlistener(new channelfuturelistener() { 11 @override 12 public void operationcomplete(channelfuture f) throws exception { 13 if (f.issuccess()) { 14 responsefuture.setsendrequestok(true); 15 return; 16 } else { 17 responsefuture.setsendrequestok(false); 18 } 19 20 responsetable.remove(opaque); 21 responsefuture.setcause(f.cause()); 22 responsefuture.putresponse(null); 23 log.warn("send a request command to channel <" + addr + "> failed."); 24 } 25 }); 26 27 remotingcommand responsecommand = responsefuture.waitresponse(timeoutmillis); 28 if (null == responsecommand) { 29 if (responsefuture.issendrequestok()) { 30 throw new remotingtimeoutexception(remotinghelper.parsesocketaddressaddr(addr), timeoutmillis, 31 responsefuture.getcause()); 32 } else { 33 throw new remotingsendrequestexception(remotinghelper.parsesocketaddressaddr(addr), responsefuture.getcause()); 34 } 35 } 36 37 return responsecommand; 38 } finally { 39 this.responsetable.remove(opaque); 40 } 41 }
和async基本一致,只不过在完成writeandflush后,使用responsefuture的waitresponse方法,在超时时间内进行等待response的回送
若是发送失败,则会在defaultmqproducerimpl的senddefaultimpl中的for循环继续,直至发送完成或者发送此时用完
若是在超时时间内,接收到broker的回送response,在invokesync中会执行配置了的rpchook的doafterresponse方法,然后在sendmessagesync中由processsendresponse处理接收到的响应
到此producer的消息发送结束