RocketMQ 源码阅读 ---- 延时消息
一、概念解析
定时消息:在某个时间点投递消息,比如 2018-07-09 00:00:00 投递消息
延时消息:在过了多少时间后,投递消息,比如 10 秒后投递消息(开源版本的 RocketMQ 只提供固定几个时间点的延时消息)
二、测试用例
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* @author zhuyibin
*/
public class ScheduledMessageProducer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("scheduler_message");
producer.start();
for (int i = 0; i < 3; i++) {
Message message = new Message("SchedulerTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
上述代码中 setDelayTimeLevel() 解释如下:
DelayTimeLevel |
时间 |
1 |
1s |
2 |
5s |
3 |
10s |
4 |
30s |
5 |
1m |
6 |
2m |
7 |
3m |
8 |
4m |
9 |
5m |
10 |
6m |
11 |
7m |
12 |
8m |
13 |
9m |
14 |
10m |
15 |
20m |
16 |
30m |
17 |
1h |
18 |
2h |
在 broker 初始化的时候就加载好了这些对应数据
BrokerStartup.java
public static BrokerController createBrokerController(String[] args) {
...
boolean initResult = controller.initialize();
...
}
BrokerController.java
public boolean initialize() throws CloneNotSupportedException {
...
messageStore.load()
...
}
DefaultMessageStore.java
....
scheduleMessageService.load()
....
ScheduleMessageService.java
...
parseDelayLevel()
...
// 将 Level 准换成具体延时时间
ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable
三、源码解析
整体原理的流程图如下:
-
延迟消息正常提交给 CommitLog 保存
-
因为是延迟消息,所以变更为延时队列指定的 Topic 和 queueId,这样就转换为 ConsumerQueue(Scheduler),从而不会像 ConsumerQueue(Normal)被正常消费
-
延时队列调度器,轮询查看相应的队列中消息是否到了要执行的时间
-
到了执行时间的消息,恢复原来消息的 topic 和 queueId,发给 broker 就变为 ConsumerQueue(nornal)。这样就能正常消费了
使用了 Level 的方式,不同时间放进不同 queue,这样就避免了排序问题,成为了一个 O(1) 的队列插入。
对延时队列的处理是在存储之前,所以看下消息存储的时序图
CommitLog 类中,putMessage(final MessageExtBrokerInner msg) 方法
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
......
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 延迟队列的处理
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 将 topic、queueId 替换为延时队列指定的
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 备份原来的topic和queueId,以后后续使用
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
......
}
可以看到代码中将原来的 Topic 替换为 SCHEDULE_TOPIC_XXXX,queueId 替换为 delayLevel - 1。
ScheduleMessageService.java
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
我们关注到这个转换 queueId 的类 ScheduleMessageService,看类名就像是处理定时调度消息的服务 ,有一个 run 方法。这里的 for 循环可以看出来每个 delay level 都有各自对应的 timerTask,这样拉数据不会相互影响。
// broker 启动的时候,会启动这个方法
public void start() {
// 在broker初始化的时候,parseDelayLevel() 把所有 level 对应的时间都转换为相应的毫秒存在 delayLevelTable 。 queueId 也都定死了
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
Timer 单线程阻塞定时器,执行下面 run 方法
// DeliverDelayedMessageTimerTask.java
@Override
public void run() {
try {
this.executeOnTimeup();
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
// 获取调度 Topic 和 delaylevel 指定 queueId 下的消息。 这样的queue是不会走正常投递流程,只会被这个定时任务扫描
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset; // 偏移量
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // ConsumeQueue.CQ_STORE_UNIT_SIZE=20 就是 ConsumeQueue 8+4+8 数据格式大小
long offsetPy = bufferCQ.getByteBuffer().getLong(); // commitLog 的 offeset
int sizePy = bufferCQ.getByteBuffer().getInt(); // 消息的 size
long tagsCode = bufferCQ.getByteBuffer().getLong(); // tag 的 hashcode,因为是延时队列,在构建 ConsumeQueue 的时候,这个值被转换为消息发送时间
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 如果设定投递时间大于 当前时间+level延迟,则投递时间设为当前时间;否则投递时间不变,就是 tagsCode
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) { // 到达消息需要发送的时间
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy); // 取出消息
if (msgExt != null) {
try {
// 恢复之前被改变的 topic 和 queueId
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 恢复原状的消息,当成普通消息存储到 commitlog 中,这样就进入到正常消息发布的流程
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me 发送失败则继续下一个 Timer 检测,并更新 offset
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else { // 时间还没到就进行下一次 Timer 扫描,并更新 offset
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
构建 ConsumeQueue 的时候,对 tagsCode 的处理:
CommitLog.java
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
.....
// Timing message processing
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
.....
}
// ScheduleMessageService.java
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
Long time = this.delayLevelTable.get(delayLevel);
if (time != null) { // storeTimestamp 消息存储在 broker 时间
return time + storeTimestamp; // 算出具体延时到什么时候
}
return storeTimestamp + 1000;
}
上一篇: SQL格式化工具
下一篇: 分布式事务-RocketMq