欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Rocketmq offset进度管理

程序员文章站 2022-05-29 22:04:52
...

下文以DefaultMQPushConsumerImpl集群模式消费消息为例。

概述

消息消费完成后,需要将消费进度存储起来,即前面提到的offset。广播模式下,同消费组的消费者相互独立,消费进度要单独存储;集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的。

消费者端

提交offset入口

入口在org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult中的最后一段逻辑:

 public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        ------------------省略-----------------------

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

消息消费完成(不论成功或失败)后,将消息从ProcessQueue中移除,同时返回ProcessQueue中最小的offset,使用这个offset值更新消费进度,removeMessage返回的offset有两种情况,一是已经没有消息了,返回
ProcessQueue最大offset+1,二是还有消息,则返回未消费消息的最小offset。举个例子,ProcessQueue中有offset为101-110的10条消息,如果全部消费完了,返回的offset为111;如果101未消费完成,102-110消费完成,则返回的offset为101,这种情况下如果消费者异常退出,会出现重复消费的风险,所以要求消费逻辑幂等。

updateOffset逻辑

看RemoteBrokerOffsetStore的updateOffset()逻辑,将offset更新到内存中,这里RemoteBrokerOffsetStore使用ConcurrentHashMap保存MessageQueue的消费进度:

    @Override
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }

            if (null != offsetOld) {
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }

可以看到,这里将offset更新到内存中就返回了,并没有向broker端提交,具体提交逻辑有两种方式:

方式1:拉取消息时顺带提交

来看DefaultMQPushConsumerImpl类的pullMessage方法

    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        ------------------此处省略500字----------------------
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }

        
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }

代码中,我们可以看到 通过this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);从内存中读到待提交的offset值,并将commitOffsetEnable设置为true. 核心方法pullKernelImpl将参数sysFlag和commitOffsetValue传递到broker端。

broker端处理拉取消息的Processor是PullMessageProcessor。我们重点观察一下processRequest方法中设置offset的代码。

        boolean storeOffsetEnable = brokerAllowSuspend;
        storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
        storeOffsetEnable = storeOffsetEnable
            && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
        if (storeOffsetEnable) {
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
        }

方式2:拉取消息时顺带提交

消费端每隔5s,通过persistAll向broker端提交offset值。

上翻逻辑,在MQClientInstance启动的时候会注册定时任务,每5s执行一次persistAllConsumerOffset(),最终调用到persistAll()。

    private void persistAllConsumerOffset() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            impl.persistConsumerOffset();
        }
    }
    //DefaultMQPushConsumerImpl
    @Override
    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            mqs.addAll(allocateMq);

            this.offsetStore.persistAll(mqs);
        } catch (Exception e) {
            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
        }
    }
//RemoteBrokerOffsetStore
@Override
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();

        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            AtomicLong offset = entry.getValue();
            if (offset != null) {
                if (mqs.contains(mq)) {
                    try {
                        this.updateConsumeOffsetToBroker(mq, offset.get());
                        log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                            this.groupName,
                            this.mQClientFactory.getClientId(),
                            mq,
                            offset.get());
                    } catch (Exception e) {
                        log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                    }
                } else {
                    unusedMQ.add(mq);
                }
            }
        }

        if (!unusedMQ.isEmpty()) {
            for (MessageQueue mq : unusedMQ) {
                this.offsetTable.remove(mq);
                log.info("remove unused mq, {}, {}", mq, this.groupName);
            }
        }
    }

broker端

broker端的具体逻辑在ConsumerManageProcessor,处理(1)查询消费者列表(2)更新offset(3)查询offset 三种请求。具体处理逻辑在ConsumerOffsetManager

ConsumerOffsetManager

使用

ConcurrentMap<String/* [email protected] */, ConcurrentMap<Integer/*queueid*/, Long>>

来存储所有offset信息,大map的key为[email protected],小map的key为queueid。对内存的读写操作这里不再详细分析。
分析到此处,offset全是保存在内存中,而这个offset必然是要持久化的,持久化的逻辑在哪里?
ConsumerOffsetManager继承自ConfigManager,ConfigManager的load方法是从文件中加载数据到内存,persist方法是从内存持久化数据到文件,查找具体调用,在BrokerController中有如下逻辑:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

即默认5s将数据持久化到文件中

 

参考文档:

1. https://blog.csdn.net/yankunhaha/article/details/100061337

2. https://blog.csdn.net/GAMEloft9/article/details/103999826

相关标签: rocketmq系统