欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ 源码阅读 ---- 延时消息

程序员文章站 2022-07-14 23:41:19
...

一、概念解析

定时消息:在某个时间点投递消息,比如 2018-07-09 00:00:00 投递消息

延时消息:在过了多少时间后,投递消息,比如 10 秒后投递消息(开源版本的 RocketMQ 只提供固定几个时间点的延时消息)

 

二、测试用例

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
* @author zhuyibin
*/
public class ScheduledMessageProducer {

public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("scheduler_message");

producer.start();

for (int i = 0; i < 3; i++) {
Message message = new Message("SchedulerTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

 

上述代码中 setDelayTimeLevel() 解释如下:

 

DelayTimeLevel

时间

1

1s

2

5s

3

10s

4

30s

5

1m

6

2m

7

3m

8

4m

9

5m

10

6m

11

7m

12

8m

13

9m

14

10m

15

20m

16

30m

17

1h

18

2h

在 broker 初始化的时候就加载好了这些对应数据

BrokerStartup.java
public static BrokerController createBrokerController(String[] args) {
    ...
boolean initResult = controller.initialize();
...
}

BrokerController.java
public boolean initialize() throws CloneNotSupportedException {
...
messageStore.load()
...
}

DefaultMessageStore.java
....
scheduleMessageService.load()
....

ScheduleMessageService.java
...
parseDelayLevel()
...


// 将 Level 准换成具体延时时间
ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable

三、源码解析

整体原理的流程图如下:

RocketMQ 源码阅读 ---- 延时消息

  1. 延迟消息正常提交给 CommitLog 保存

  2. 因为是延迟消息,所以变更为延时队列指定的 Topic 和 queueId,这样就转换为 ConsumerQueue(Scheduler),从而不会像 ConsumerQueue(Normal)被正常消费

  3. 延时队列调度器,轮询查看相应的队列中消息是否到了要执行的时间

  4. 到了执行时间的消息,恢复原来消息的 topic 和 queueId,发给 broker 就变为 ConsumerQueue(nornal)。这样就能正常消费了

 

使用了 Level 的方式,不同时间放进不同 queue,这样就避免了排序问题,成为了一个 O(1) 的队列插入。

 

对延时队列的处理是在存储之前,所以看下消息存储的时序图

RocketMQ 源码阅读 ---- 延时消息

CommitLog 类中,putMessage(final MessageExtBrokerInner msg) 方法

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    ......

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery  延迟队列的处理
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

// 将 topic、queueId 替换为延时队列指定的
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId  备份原来的topic和queueId,以后后续使用
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

    ......

}

可以看到代码中将原来的 Topic 替换为 SCHEDULE_TOPIC_XXXX,queueId 替换为 delayLevel - 1。

ScheduleMessageService.java

public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}

我们关注到这个转换 queueId 的类 ScheduleMessageService,看类名就像是处理定时调度消息的服务 ,有一个 run 方法。这里的 for 循环可以看出来每个 delay level 都有各自对应的 timerTask,这样拉数据不会相互影响。

// broker 启动的时候,会启动这个方法
public void start() {

// 在broker初始化的时候,parseDelayLevel() 把所有 level 对应的时间都转换为相应的毫秒存在 delayLevelTable 。 queueId 也都定死了
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

Timer 单线程阻塞定时器,执行下面 run 方法

// DeliverDelayedMessageTimerTask.java
@Override
        public void run() {
            try {
                this.executeOnTimeup();
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }
public void executeOnTimeup() {
// 获取调度 Topic 和 delaylevel 指定 queueId 下的消息。 这样的queue是不会走正常投递流程,只会被这个定时任务扫描
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel)); 

long failScheduleOffset = offset; // 偏移量

if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {  // ConsumeQueue.CQ_STORE_UNIT_SIZE=20  就是 ConsumeQueue 8+4+8  数据格式大小
long offsetPy = bufferCQ.getByteBuffer().getLong();  // commitLog 的 offeset
int sizePy = bufferCQ.getByteBuffer().getInt(); // 消息的 size
long tagsCode = bufferCQ.getByteBuffer().getLong(); // tag 的 hashcode,因为是延时队列,在构建 ConsumeQueue 的时候,这个值被转换为消息发送时间

if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}

long now = System.currentTimeMillis();
// 如果设定投递时间大于 当前时间+level延迟,则投递时间设为当前时间;否则投递时间不变,就是 tagsCode
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = deliverTimestamp - now;

if (countdown <= 0) { // 到达消息需要发送的时间
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy); // 取出消息

if (msgExt != null) {
try {
// 恢复之前被改变的 topic 和 queueId
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 恢复原状的消息,当成普通消息存储到 commitlog 中,这样就进入到正常消息发布的流程
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);

if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me   发送失败则继续下一个 Timer 检测,并更新 offset
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me



*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else { // 时间还没到就进行下一次 Timer 扫描,并更新 offset
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

构建 ConsumeQueue 的时候,对 tagsCode 的处理:

CommitLog.java
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
        final boolean readBody) { 
        
.....

// Timing message processing
                {
                    String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                    if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
                        int delayLevel = Integer.parseInt(t);

                        if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                            delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                        }

                        if (delayLevel > 0) { 
                            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                storeTimestamp);
                        }
                    }
                }


.....



}

// ScheduleMessageService.java
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
        Long time = this.delayLevelTable.get(delayLevel);
        if (time != null) { // storeTimestamp 消息存储在 broker 时间
            return time + storeTimestamp;  // 算出具体延时到什么时候
        }

        return storeTimestamp + 1000;
    }