Rocketmq offset进度管理
下文以DefaultMQPushConsumerImpl集群模式消费消息为例。
概述
消息消费完成后,需要将消费进度存储起来,即前面提到的offset。广播模式下,同消费组的消费者相互独立,消费进度要单独存储;集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的。
消费者端
提交offset入口
入口在org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult中的最后一段逻辑:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
------------------省略-----------------------
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
消息消费完成(不论成功或失败)后,将消息从ProcessQueue中移除,同时返回ProcessQueue中最小的offset,使用这个offset值更新消费进度,removeMessage返回的offset有两种情况,一是已经没有消息了,返回
ProcessQueue最大offset+1,二是还有消息,则返回未消费消息的最小offset。举个例子,ProcessQueue中有offset为101-110的10条消息,如果全部消费完了,返回的offset为111;如果101未消费完成,102-110消费完成,则返回的offset为101,这种情况下如果消费者异常退出,会出现重复消费的风险,所以要求消费逻辑幂等。
updateOffset逻辑
看RemoteBrokerOffsetStore的updateOffset()逻辑,将offset更新到内存中,这里RemoteBrokerOffsetStore使用ConcurrentHashMap保存MessageQueue的消费进度:
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
可以看到,这里将offset更新到内存中就返回了,并没有向broker端提交,具体提交逻辑有两种方式:
方式1:拉取消息时顺带提交
来看DefaultMQPushConsumerImpl类的pullMessage方法
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
------------------此处省略500字----------------------
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
代码中,我们可以看到 通过this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);从内存中读到待提交的offset值,并将commitOffsetEnable设置为true. 核心方法pullKernelImpl将参数sysFlag和commitOffsetValue传递到broker端。
broker端处理拉取消息的Processor是PullMessageProcessor。我们重点观察一下processRequest方法中设置offset的代码。
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
方式2:拉取消息时顺带提交
消费端每隔5s,通过persistAll向broker端提交offset值。
上翻逻辑,在MQClientInstance启动的时候会注册定时任务,每5s执行一次persistAllConsumerOffset(),最终调用到persistAll()。
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
//DefaultMQPushConsumerImpl
@Override
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
//RemoteBrokerOffsetStore
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}
broker端
broker端的具体逻辑在ConsumerManageProcessor,处理(1)查询消费者列表(2)更新offset(3)查询offset 三种请求。具体处理逻辑在ConsumerOffsetManager
ConsumerOffsetManager
使用
ConcurrentMap<String/* [email protected] */, ConcurrentMap<Integer/*queueid*/, Long>>
来存储所有offset信息,大map的key为[email protected],小map的key为queueid。对内存的读写操作这里不再详细分析。
分析到此处,offset全是保存在内存中,而这个offset必然是要持久化的,持久化的逻辑在哪里?
ConsumerOffsetManager继承自ConfigManager,ConfigManager的load方法是从文件中加载数据到内存,persist方法是从内存持久化数据到文件,查找具体调用,在BrokerController中有如下逻辑:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
即默认5s将数据持久化到文件中
参考文档:
1. https://blog.csdn.net/yankunhaha/article/details/100061337
2. https://blog.csdn.net/GAMEloft9/article/details/103999826
推荐阅读
-
rocketmq管理控制台(rocketmq-console)安装记录
-
对如何估算时间的一点想法 估算开发管理进度计划
-
简练软考知识点整理-控制进度管理过程 简练软考项目管理
-
简练软考知识点整理-控制进度管理过程 简练软考项目管理
-
简练软考知识点整理-制定进度计划过程 简练软考项目管理
-
简练软考知识点整理-制定进度计划过程 简练软考项目管理
-
简练软考知识点整理-规划进度管理 简练网软考信息系统项目管理师系统集成项目管理工程师项目管理
-
Kafka偏移量(Offset)管理
-
Apache RocketMQ官方文档中文精简版-CLI管理工具
-
二、CLI管理工具(RocketMQ提供CLI管理工具带来查询,管理和诊断各种问题)