RocketMQ中PullConsumer的消息拉取源码分析
在pullconsumer中,有关消息的拉取rocketmq提供了很多api,但总的来说分为两种,同步消息拉取和异步消息拉取
同步消息拉取
以同步方式拉取消息都是通过defaultmqpullconsumerimpl的pullsyncimpl方法:
1 private pullresult pullsyncimpl(messagequeue mq, subscriptiondata subscriptiondata, long offset, int maxnums, boolean block, 2 long timeout) 3 throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception { 4 this.makesurestateok(); 5 6 if (null == mq) { 7 throw new mqclientexception("mq is null", null); 8 } 9 10 if (offset < 0) { 11 throw new mqclientexception("offset < 0", null); 12 } 13 14 if (maxnums <= 0) { 15 throw new mqclientexception("maxnums <= 0", null); 16 } 17 18 this.subscriptionautomatically(mq.gettopic()); 19 20 int sysflag = pullsysflag.buildsysflag(false, block, true, false); 21 22 long timeoutmillis = block ? this.defaultmqpullconsumer.getconsumertimeoutmilliswhensuspend() : timeout; 23 24 boolean istagtype = expressiontype.istagtype(subscriptiondata.getexpressiontype()); 25 pullresult pullresult = this.pullapiwrapper.pullkernelimpl( 26 mq, 27 subscriptiondata.getsubstring(), 28 subscriptiondata.getexpressiontype(), 29 istagtype ? 0l : subscriptiondata.getsubversion(), 30 offset, 31 maxnums, 32 sysflag, 33 0, 34 this.defaultmqpullconsumer.getbrokersuspendmaxtimemillis(), 35 timeoutmillis, 36 communicationmode.sync, 37 null 38 ); 39 this.pullapiwrapper.processpullresult(mq, pullresult, subscriptiondata); 40 if (!this.consumemessagehooklist.isempty()) { 41 consumemessagecontext consumemessagecontext = null; 42 consumemessagecontext = new consumemessagecontext(); 43 consumemessagecontext.setconsumergroup(this.groupname()); 44 consumemessagecontext.setmq(mq); 45 consumemessagecontext.setmsglist(pullresult.getmsgfoundlist()); 46 consumemessagecontext.setsuccess(false); 47 this.executehookbefore(consumemessagecontext); 48 consumemessagecontext.setstatus(consumeconcurrentlystatus.consume_success.tostring()); 49 consumemessagecontext.setsuccess(true); 50 this.executehookafter(consumemessagecontext); 51 } 52 return pullresult; 53 }
首先通过subscriptionautomatically方法检查topic是否订阅
1 public void subscriptionautomatically(final string topic) { 2 if (!this.rebalanceimpl.getsubscriptioninner().containskey(topic)) { 3 try { 4 subscriptiondata subscriptiondata = filterapi.buildsubscriptiondata(this.defaultmqpullconsumer.getconsumergroup(), 5 topic, subscriptiondata.sub_all); 6 this.rebalanceimpl.subscriptioninner.putifabsent(topic, subscriptiondata); 7 } catch (exception ignore) { 8 } 9 } 10 }
若是没有就新建一条订阅数据保存在rebalanceimpl的subscriptioninner中
之后调用pullkernelimpl方法:
1 public pullresult pullkernelimpl( 2 final messagequeue mq, 3 final string subexpression, 4 final string expressiontype, 5 final long subversion, 6 final long offset, 7 final int maxnums, 8 final int sysflag, 9 final long commitoffset, 10 final long brokersuspendmaxtimemillis, 11 final long timeoutmillis, 12 final communicationmode communicationmode, 13 final pullcallback pullcallback 14 ) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception { 15 findbrokerresult findbrokerresult = 16 this.mqclientfactory.findbrokeraddressinsubscribe(mq.getbrokername(), 17 this.recalculatepullfromwhichnode(mq), false); 18 if (null == findbrokerresult) { 19 this.mqclientfactory.updatetopicrouteinfofromnameserver(mq.gettopic()); 20 findbrokerresult = 21 this.mqclientfactory.findbrokeraddressinsubscribe(mq.getbrokername(), 22 this.recalculatepullfromwhichnode(mq), false); 23 } 24 25 if (findbrokerresult != null) { 26 { 27 // check version 28 if (!expressiontype.istagtype(expressiontype) 29 && findbrokerresult.getbrokerversion() < mqversion.version.v4_1_0_snapshot.ordinal()) { 30 throw new mqclientexception("the broker[" + mq.getbrokername() + ", " 31 + findbrokerresult.getbrokerversion() + "] does not upgrade to support for filter message by " + expressiontype, null); 32 } 33 } 34 int sysflaginner = sysflag; 35 36 if (findbrokerresult.isslave()) { 37 sysflaginner = pullsysflag.clearcommitoffsetflag(sysflaginner); 38 } 39 40 pullmessagerequestheader requestheader = new pullmessagerequestheader(); 41 requestheader.setconsumergroup(this.consumergroup); 42 requestheader.settopic(mq.gettopic()); 43 requestheader.setqueueid(mq.getqueueid()); 44 requestheader.setqueueoffset(offset); 45 requestheader.setmaxmsgnums(maxnums); 46 requestheader.setsysflag(sysflaginner); 47 requestheader.setcommitoffset(commitoffset); 48 requestheader.setsuspendtimeoutmillis(brokersuspendmaxtimemillis); 49 requestheader.setsubscription(subexpression); 50 requestheader.setsubversion(subversion); 51 requestheader.setexpressiontype(expressiontype); 52 53 string brokeraddr = findbrokerresult.getbrokeraddr(); 54 if (pullsysflag.hasclassfilterflag(sysflaginner)) { 55 brokeraddr = computpullfromwhichfilterserver(mq.gettopic(), brokeraddr); 56 } 57 58 pullresult pullresult = this.mqclientfactory.getmqclientapiimpl().pullmessage( 59 brokeraddr, 60 requestheader, 61 timeoutmillis, 62 communicationmode, 63 pullcallback); 64 65 return pullresult; 66 } 67 68 throw new mqclientexception("the broker[" + mq.getbrokername() + "] not exist", null); 69 }
首先通过findbrokeraddressinsubscribe方法查找关于消息队列的broker信息
这里的recalculatepullfromwhichnode方法:
1 public long recalculatepullfromwhichnode(final messagequeue mq) { 2 if (this.isconnectbrokerbyuser()) { 3 return this.defaultbrokerid; 4 } 5 6 atomiclong suggest = this.pullfromwhichnodetable.get(mq); 7 if (suggest != null) { 8 return suggest.get(); 9 } 10 11 return mixall.master_id; 12 }
根据消息队列,在pullfromwhichnodetable查找其对应的broker的id
pullfromwhichnodetable记录了消息对了和brokerid的映射
1 private concurrentmap<messagequeue, atomiclong/* brokerid */> pullfromwhichnodetable = 2 new concurrenthashmap<messagequeue, atomiclong>(32);
(master的brokerid为0,slave的brokerid大于0)
findbrokeraddressinsubscribe方法:
1 public findbrokerresult findbrokeraddressinsubscribe( 2 final string brokername, 3 final long brokerid, 4 final boolean onlythisbroker 5 ) { 6 string brokeraddr = null; 7 boolean slave = false; 8 boolean found = false; 9 10 hashmap<long/* brokerid */, string/* address */> map = this.brokeraddrtable.get(brokername); 11 if (map != null && !map.isempty()) { 12 brokeraddr = map.get(brokerid); 13 slave = brokerid != mixall.master_id; 14 found = brokeraddr != null; 15 16 if (!found && !onlythisbroker) { 17 entry<long, string> entry = map.entryset().iterator().next(); 18 brokeraddr = entry.getvalue(); 19 slave = entry.getkey() != mixall.master_id; 20 found = true; 21 } 22 } 23 24 if (found) { 25 return new findbrokerresult(brokeraddr, slave, findbrokerversion(brokername, brokeraddr)); 26 } 27 28 return null; 29 }
这里就根据brokeraddrtable表查找该brokerid对应的broker的地址信息,以及是否是slave
封装为findbrokerresult返回
若是没有找到broker的路由信息,则通过updatetopicrouteinfofromnameserver方法向nameserver请求更新,更新完成后再调用findbrokeraddressinsubscribe方法查找
之后会根据相应的信息封装请求消息头pullmessagerequestheader
然后调用pullmessage方法:
1 public pullresult pullmessage( 2 final string addr, 3 final pullmessagerequestheader requestheader, 4 final long timeoutmillis, 5 final communicationmode communicationmode, 6 final pullcallback pullcallback 7 ) throws remotingexception, mqbrokerexception, interruptedexception { 8 remotingcommand request = remotingcommand.createrequestcommand(requestcode.pull_message, requestheader); 9 10 switch (communicationmode) { 11 case oneway: 12 assert false; 13 return null; 14 case async: 15 this.pullmessageasync(addr, request, timeoutmillis, pullcallback); 16 return null; 17 case sync: 18 return this.pullmessagesync(addr, request, timeoutmillis); 19 default: 20 assert false; 21 break; 22 } 23 24 return null; 25 }
这里就可以看出我前面说的两种类型,同步拉取和异步拉取
pullmessagesync方法:
1 private pullresult pullmessagesync( 2 final string addr, 3 final remotingcommand request, 4 final long timeoutmillis 5 ) throws remotingexception, interruptedexception, mqbrokerexception { 6 remotingcommand response = this.remotingclient.invokesync(addr, request, timeoutmillis); 7 assert response != null; 8 return this.processpullresponse(response); 9 }
这里其实就是通过invokesync方法,由netty进行同步发送,将请求发送给broker
关于消息的发送详见:
在收到响应后由processpullresponse方法处理
processpullresponse方法:
1 private pullresult processpullresponse( 2 final remotingcommand response) throws mqbrokerexception, remotingcommandexception { 3 pullstatus pullstatus = pullstatus.no_new_msg; 4 switch (response.getcode()) { 5 case responsecode.success: 6 pullstatus = pullstatus.found; 7 break; 8 case responsecode.pull_not_found: 9 pullstatus = pullstatus.no_new_msg; 10 break; 11 case responsecode.pull_retry_immediately: 12 pullstatus = pullstatus.no_matched_msg; 13 break; 14 case responsecode.pull_offset_moved: 15 pullstatus = pullstatus.offset_illegal; 16 break; 17 18 default: 19 throw new mqbrokerexception(response.getcode(), response.getremark()); 20 } 21 22 pullmessageresponseheader responseheader = 23 (pullmessageresponseheader) response.decodecommandcustomheader(pullmessageresponseheader.class); 24 25 return new pullresultext(pullstatus, responseheader.getnextbeginoffset(), responseheader.getminoffset(), 26 responseheader.getmaxoffset(), null, responseheader.getsuggestwhichbrokerid(), response.getbody()); 27 }
根据响应的状态,设置pullstatus状态
然后通过decodecommandcustomheader方法,将响应中的信息解码
最后由pullresultext封装消息信息
1 public class pullresultext extends pullresult { 2 private final long suggestwhichbrokerid; 3 private byte[] messagebinary; 4 5 public pullresultext(pullstatus pullstatus, long nextbeginoffset, long minoffset, long maxoffset, 6 list<messageext> msgfoundlist, final long suggestwhichbrokerid, final byte[] messagebinary) { 7 super(pullstatus, nextbeginoffset, minoffset, maxoffset, msgfoundlist); 8 this.suggestwhichbrokerid = suggestwhichbrokerid; 9 this.messagebinary = messagebinary; 10 } 11 ...... 12 } 13 14 public class pullresult { 15 private final pullstatus pullstatus; 16 private final long nextbeginoffset; 17 private final long minoffset; 18 private final long maxoffset; 19 private list<messageext> msgfoundlist; 20 21 public pullresult(pullstatus pullstatus, long nextbeginoffset, long minoffset, long maxoffset, 22 list<messageext> msgfoundlist) { 23 super(); 24 this.pullstatus = pullstatus; 25 this.nextbeginoffset = nextbeginoffset; 26 this.minoffset = minoffset; 27 this.maxoffset = maxoffset; 28 this.msgfoundlist = msgfoundlist; 29 } 30 ...... 31 }
拉取到的消息可能是多条,具体内容在pullresult 中的msgfoundlist保存,messageext是message的超类
回到pullsyncimpl方法,在拉取到消息后,调用processpullresult方法:
1 public pullresult processpullresult(final messagequeue mq, final pullresult pullresult, 2 final subscriptiondata subscriptiondata) { 3 pullresultext pullresultext = (pullresultext) pullresult; 4 5 this.updatepullfromwhichnode(mq, pullresultext.getsuggestwhichbrokerid()); 6 if (pullstatus.found == pullresult.getpullstatus()) { 7 bytebuffer bytebuffer = bytebuffer.wrap(pullresultext.getmessagebinary()); 8 list<messageext> msglist = messagedecoder.decodes(bytebuffer); 9 10 list<messageext> msglistfilteragain = msglist; 11 if (!subscriptiondata.gettagsset().isempty() && !subscriptiondata.isclassfiltermode()) { 12 msglistfilteragain = new arraylist<messageext>(msglist.size()); 13 for (messageext msg : msglist) { 14 if (msg.gettags() != null) { 15 if (subscriptiondata.gettagsset().contains(msg.gettags())) { 16 msglistfilteragain.add(msg); 17 } 18 } 19 } 20 } 21 22 if (this.hashook()) { 23 filtermessagecontext filtermessagecontext = new filtermessagecontext(); 24 filtermessagecontext.setunitmode(unitmode); 25 filtermessagecontext.setmsglist(msglistfilteragain); 26 this.executehook(filtermessagecontext); 27 } 28 29 for (messageext msg : msglistfilteragain) { 30 string traflag = msg.getproperty(messageconst.property_transaction_prepared); 31 if (traflag != null && boolean.parseboolean(traflag)) { 32 msg.settransactionid(msg.getproperty(messageconst.property_uniq_client_message_id_keyidx)); 33 } 34 messageaccessor.putproperty(msg, messageconst.property_min_offset, 35 long.tostring(pullresult.getminoffset())); 36 messageaccessor.putproperty(msg, messageconst.property_max_offset, 37 long.tostring(pullresult.getmaxoffset())); 38 } 39 40 pullresultext.setmsgfoundlist(msglistfilteragain); 41 } 42 43 pullresultext.setmessagebinary(null); 44 45 return pullresult; 46 }
首先调用updatepullfromwhichnode方法:
1 public void updatepullfromwhichnode(final messagequeue mq, final long brokerid) { 2 atomiclong suggest = this.pullfromwhichnodetable.get(mq); 3 if (null == suggest) { 4 this.pullfromwhichnodetable.put(mq, new atomiclong(brokerid)); 5 } else { 6 suggest.set(brokerid); 7 } 8 }
这里就会将pullfromwhichnodetable中记录的消息队列和brokerid的映射,更新为broker发送过来的建议id
结合上一篇博客来看,若是采用集群模式,就完成了消费者端的负载均衡
在pullstatus.found情况下,会调用messagedecoder的decodes方法,将commitlog格式的消息数据进行解码,转化为真正可读的消息
之后会对tag进行判断,设置了tag,添加tag消息记录
之后,在设置了filtermessagehook钩子情况下,通过executehook方法执行filtermessagehook钩子的filtermessage方法:
1 public void executehook(final filtermessagecontext context) { 2 if (!this.filtermessagehooklist.isempty()) { 3 for (filtermessagehook hook : this.filtermessagehooklist) { 4 try { 5 hook.filtermessage(context); 6 } catch (throwable e) { 7 log.error("execute hook error. hookname={}", hook.hookname()); 8 } 9 } 10 } 11 }
然后对消息进行属性设置
processpullresult完成后,若是设置了consumemessagehook钩子,调用executehookbefore和executehookafter方法,分别执行钩子中的consumemessagebefore和consumemessageafter方法:
1 public void executehookbefore(final consumemessagecontext context) { 2 if (!this.consumemessagehooklist.isempty()) { 3 for (consumemessagehook hook : this.consumemessagehooklist) { 4 try { 5 hook.consumemessagebefore(context); 6 } catch (throwable ignored) { 7 } 8 } 9 } 10 } 11 12 public void executehookafter(final consumemessagecontext context) { 13 if (!this.consumemessagehooklist.isempty()) { 14 for (consumemessagehook hook : this.consumemessagehooklist) { 15 try { 16 hook.consumemessageafter(context); 17 } catch (throwable ignored) { 18 } 19 } 20 } 21 }
pullconsumer消息的同步拉取到此结束
异步消息拉取
异步拉取的api都通过pullasyncimpl方法实现:
1 private void pullasyncimpl( 2 final messagequeue mq, 3 final subscriptiondata subscriptiondata, 4 final long offset, 5 final int maxnums, 6 final pullcallback pullcallback, 7 final boolean block, 8 final long timeout) throws mqclientexception, remotingexception, interruptedexception { 9 this.makesurestateok(); 10 11 if (null == mq) { 12 throw new mqclientexception("mq is null", null); 13 } 14 15 if (offset < 0) { 16 throw new mqclientexception("offset < 0", null); 17 } 18 19 if (maxnums <= 0) { 20 throw new mqclientexception("maxnums <= 0", null); 21 } 22 23 if (null == pullcallback) { 24 throw new mqclientexception("pullcallback is null", null); 25 } 26 27 this.subscriptionautomatically(mq.gettopic()); 28 29 try { 30 int sysflag = pullsysflag.buildsysflag(false, block, true, false); 31 32 long timeoutmillis = block ? this.defaultmqpullconsumer.getconsumertimeoutmilliswhensuspend() : timeout; 33 34 boolean istagtype = expressiontype.istagtype(subscriptiondata.getexpressiontype()); 35 this.pullapiwrapper.pullkernelimpl( 36 mq, 37 subscriptiondata.getsubstring(), 38 subscriptiondata.getexpressiontype(), 39 istagtype ? 0l : subscriptiondata.getsubversion(), 40 offset, 41 maxnums, 42 sysflag, 43 0, 44 this.defaultmqpullconsumer.getbrokersuspendmaxtimemillis(), 45 timeoutmillis, 46 communicationmode.async, 47 new pullcallback() { 48 49 @override 50 public void onsuccess(pullresult pullresult) { 51 pullcallback 52 .onsuccess(defaultmqpullconsumerimpl.this.pullapiwrapper.processpullresult(mq, pullresult, subscriptiondata)); 53 } 54 55 @override 56 public void onexception(throwable e) { 57 pullcallback.onexception(e); 58 } 59 }); 60 } catch (mqbrokerexception e) { 61 throw new mqclientexception("pullasync unknow exception", e); 62 } 63 }
相比同步,参数多了个pullcallback,用于处理异步拉取后的回调
过程基本上个同步拉取类似,只不过在调用pullkernelimpl方法时,会创建一个pullcallback
在onsuccess和onexception中,实际上调用了pullcallback的相应方法,这样就完成了异步的回调
在onsuccess回调的参数中,同同步方式类似,会通过processpullresult方法,对结果进一步加工
之后的pullkernelimpl方法和同步一样
只不过最后调用了pullmessageasync方法:
1 private void pullmessageasync( 2 final string addr, 3 final remotingcommand request, 4 final long timeoutmillis, 5 final pullcallback pullcallback 6 ) throws remotingexception, interruptedexception { 7 this.remotingclient.invokeasync(addr, request, timeoutmillis, new invokecallback() { 8 @override 9 public void operationcomplete(responsefuture responsefuture) { 10 remotingcommand response = responsefuture.getresponsecommand(); 11 if (response != null) { 12 try { 13 pullresult pullresult = mqclientapiimpl.this.processpullresponse(response); 14 assert pullresult != null; 15 pullcallback.onsuccess(pullresult); 16 } catch (exception e) { 17 pullcallback.onexception(e); 18 } 19 } else { 20 if (!responsefuture.issendrequestok()) { 21 pullcallback.onexception(new mqclientexception("send request failed to " + addr + ". request: " + request, responsefuture.getcause())); 22 } else if (responsefuture.istimeout()) { 23 pullcallback.onexception(new mqclientexception("wait response from " + addr + " timeout :" + responsefuture.gettimeoutmillis() + "ms" + ". request: " + request, 24 responsefuture.getcause())); 25 } else { 26 pullcallback.onexception(new mqclientexception("unknown reason. addr: " + addr + ", timeoutmillis: " + timeoutmillis + ". request: " + request, responsefuture.getcause())); 27 } 28 } 29 } 30 }); 31 }
这里实际上也是通过netty完成异步发送
详见:
由于是异步发送,这里又设置了一个回调invokecallback
当请求发送完成,收到响应后,就会执行invokecallback的operationcomplete方法,
在operationcomplete方法中,和同步一样,执行processpullresponse方法,处理响应
之后调用pullcallback的onsuccess方法,也就是刚才创建的回调接口,进而执行用户传入的回调接口的方法
消息异步拉取也就到此结束