RocketMQ原理及源码解析之Consumer消息消费
本文源码分析是基于RocketMQ 4.6.0;文中分析,省略了部分代码
1. Consumer消费设计
RocektMQ消费设计图(并发消费和顺序消费基本流程相似,顺序消费存在加锁解锁过程)。按照图中从上到下标注的序号顺序分析此图:
虚线框都是consumer客户端,也就是我们的实际业务系统;Broker是MQ的消息存储服务器
-
RebalanceImpl(最上面Rebalance):此类主要是做了消息的负载均衡,就是把所有从nameSrv获取到的MessageQueue的信息按照一定的策略分配给当前client服务器(此client服务器需要消费哪几个queue)(不做详细解释,如有时间,再写一篇说明)
-
RebalanceImpl把负载均衡之后的queue组装成PullRequest,put到PullRequestQueue中,一个Queue组装一个PullRequest。
RebalanceImpl代码如下:
private void rebalanceByTopic(final String topic, final boolean isOrder) { 代码省略... /** * * allocateResultSet即为负载均衡策略分配后的Queue的基本信息的集合 */ boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 代码省略... } private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { 代码省略... List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { 代码省略... /** * 初始化一个PullRequest */ log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); 代码省略... } /** * 追踪此处代码进去,可以看到PullMessageService类中,executePullRequestImmediately方法里边,this.pullRequestQueue.put(pullRequest);此处把pullRequest放到pullRequestQueue中 */ this.dispatchPullRequest(pullRequestList); return changed; }
-
PullMessageService:MQClientInstance.start启动一个线程去take pullRequestQueue(只有一个线程,所以去broker拉取消息是单线程去拉取的)
PullMessageService代码如下:(上图1-4的过程都是在此处完成的,对照设计图阅读源码更香)
public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { /** * 1. 启动单独一个线程去pullRequestQueue拿请求实体 */ PullRequest pullRequest = this.pullRequestQueue.take(); /** * 拉取消息,代码跟进去 */ this.pullMessage(pullRequest); } } public void pullMessage(final PullRequest pullRequest) { //PullRequest中创建了一个ProcessQueue,这个ProcessQueue是用来存放拉取的消息,先知道 final ProcessQueue processQueue = pullRequest.getProcessQueue(); long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); /** * 2. 如果堆积未处理的消息过多, */ if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { /** * 3.2 则把PullRequest扔回pullRequestQueue,延时执行(默认50ms */ this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; } /** * 如果堆积消息的size过大,同上面的逻辑 */ if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { } /** * 检查订阅关系有没有变化,有可能在延时期间,topic或者consumer的配置都发生了变化(等一些检查判断) */ final long beginTimestamp = System.currentTimeMillis(); /** * 拉取消息成功后的回调函数,处理拉取的消息 */ PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { /** * 消息预处理,客户端再次过滤,set minOffset和maxOffset */ pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: /** * 如果获取到的消息数为0,则立即发起下一次pull */ if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); /** * 4.1 将消息放入ProcessQueue */ boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); /** * 4.2 消费消息,调用messageListener处理,此处代码下面解释 */ DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); /** * 再次提交pull request */ if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } break; default: break; } } } }; try { /** * 3.1 拉取消息 */ this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), /** * 每次拉取消息的数量,默认拉取32条 */ this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
-
ConsumeMessageConcurrentlyService(文章序号3中的代码,注释4.2提交线程池):并发消费
ConsumeMessageConcurrentlyService
public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); /** * 一次拉取的消息总数,如果小于用户想要一次消费的总数,就一次性消费 */ if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { /** * 此处线程池有20个线程,代码跟进去 */ this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { /** * 如果拉取的消息总数,大于用户想要一次性消费的数量,分多个个线程消费 * 例:一次拉取32条数据,每次只消费一条数据,多线程消费 */ for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
- ConsumeMessageConcurrentlyService内部类ConsumeRequest
@Override public void run() { try { //5.调用我们自己写的消费消息,业务逻辑处理类 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { /** * 消费处理业务超过15分钟,超时 */ returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } /** * 消费消息后置处理 */ if (!processQueue.isDropped()) { /** * 6. 消息结果处理,代码跟进去我们会发现如果消费成功,则把ProcessQueue中的消息移除。 如果消费失败,重新把消息发送到broker */ ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
就以上(4)、(5)两段代码做一个说明:
2. 本篇重点内容
-
我们看看并发消费(4)的源代码,源码中client端开启了一个大小为20的本地线程池去处理消费逻辑。且知道消费逻辑处理完成之后,我们才会把该消息重processQueue中移除(也就是图中序号6的过程)。其实这个设计很巧妙,继续往下看。
-
并发消费本身就在client端开启了一个线程池并发消费。且每次把消息放入到processQueue中,都会判断processQueue是否大于1000,大于1000就不会把消息放进去。这样的好处就是MQ会更据client端的消息消费能力来决定是否继续把消息拉取到本地(MQ线程池大小为20,会把大于20的消息延迟重新放回到processQueue)。这样就是相当于固定了线程池的大小,拒绝策略就是放回到processQueue,不会丢弃消息。
Java如何固定线程池大小 -
我们很多同学为了提升消费性能,会在本地自己再开一个线程池去消费消息,也就是让mq的线程池把消息提交到自定义线程中处理。其实完全没有必要,可能还会带来其他问题。我们自定义的线程池拒绝策略是怎么样呢?丢弃,应该不行。重新放到队列中。如果我们的系统没有这个消费能力,一直拉取新的消息放到本地队列中,内存会不会撑爆呢?所以并发消费大家还是利用原有MQ线程池去并发处理消费就可以
3. 顺序消费
基本流程相似,添加了加锁的过程。三把锁:分布式锁broker消息队列锁,本地消息队列锁,本地消息队列消费锁。后续有空再写一篇介绍
上一篇: 实验七 集成功率放大电路
下一篇: 数据压缩实验二bmp to yuv