RocketMQ中的顺序消息
我们在项目开发过程中,有需要使用RocketMQ顺序消息的场景,该如何使用呢?顺序消息的原理是怎样的呢?本文进行了一些探讨。
一、顺序消息的定义
顺序消息(FIFO:First Input First Output)是一种严格按照顺序进行发布和消费的消息类型。要求消息的发布和消息消费都按照顺序进行。
二、顺序消息的设计
在探讨RocketMQ中的普通消息的实现之前,我们有必要了解一下顺序消息的设计。下面分几种场景进行讨论。
1、发布者、MQ、消费者均为单点时的设计
1)Producer发送消息M1至MQ,MQ收到消息后进行返回,Producer再发送M2。
2)Consumer消费M1,消费完进行返回。Consumer消费M2。
基于这种设计,Producer的顺序发送很好实现,Consumer的顺序消费需要考虑M1消费失败、或者返回超时的情况。
消费失败的处理:消费失败意味着收到了明确的失败编码,对于失败的处理可能有多种策略,例如直接抛弃,但是进行记录后人工干预;重试一定的次数,仍然失败则进行记录;重试一定的时间,仍然失败则进行记录等。
返回超时的处理:超时意味着消息可能被正常消费,也可能未被消费。可能的策略有:重试一定的次数或时间,仍然超时则进行记录。但这可能引入新的问题,消息的重复消费,从MQ层面上怎么解?
2、发布者集群、MQ单点、消费者集群时的设计
1)Producer1发送消息M1至MQ,MQ收到消息后进行返回,返回时是否需要通知每个Producer呢?我们认为可以只返回给Producer1,由调用方逻辑控制再发送M2。
2)Consumer1消费M1,消费完进行返回。Consumer2消费M2。
关于消费失败或者返回超时的情况,同单点时的设计。
我们观察到Producer的单点部署和集群部署对我们分析问题不会产生太大的干扰,消息发送的顺序性是可以由调用方逻辑控制的,基于此,我们再来考虑下面的情况。
3、发布者集群、MQ集群、消费者集群时的设计
1)Producer发送消息M1至MQ,MQ收到消息后进行返回,Producer再发送M2。
2)Consumer消费M1,消费完进行返回,需要同时返回给多个MQ Server,都成功后再消费M2。
这个模型存在这样的问题:如何保证多个MQ Server都收到成功消息?一条MQ的消费需要多个应答,是否合理?考虑到消费失败和消费超时,情况变得愈加复杂。
当MQ变为集群后,顺序消息的设计变得复杂?我们该如何解决呢?当我们直面这个问题时,我们可能需要一个复杂的设计才能解决他,如果在顺序消费的场景,MQ退化为单点,是不是一个更好的方案呢?
生产者通过一定的发送方式,保证需要被顺序消费的消息只会被发送到某台MQ Server上,消费者采用类似的方式,只从这台MQ上进行消费,同时考虑到失败和超时的场景,我们的设计方案如上图。
三、测试环境搭建
我们在设计层面上对顺序消息有了一定的了解,下面我们将了解RocketMQ中的实现过程,先看下测试环境搭建的过程。
1、从官网fork代码,并导入到IDEA中。官网:https://rocketmq.apache.org/
2、新增环境变量:ROCKETMQ_HOME=rocketMQ运行目录
3、rocketMQ运行目录下新建目录conf、logs、store
4、复制并修改配置文件:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml、logback_namesrv.xml文件复制到conf目录中。修改broker.conf。日志文件也可以调整到指定目录。
5、启动name server
6、修改broker启动参数,加上Program arguments= -c D:\idea_rocketmq\rocketmq\conf\broker.conf
7、启动broker:org.apache.rocketmq.broker.BrokerStartup
8、修改producer文件,指定nameServ,启动(example目录下有样例)
9、修改consumer文件,指定nameServ,启动(example目录下有样例)
注:nameServ可以不在显示指定,RocketMQ默认会从http://jmenv.tbsite.net:8080/rocketmq/nsaddr中读取name server的地址,其中jmenv.tbsite.net与nsaddr均可以通过修改配置文件,或者java选项的方式进行替换。通过这种方式指定name server的好处是,可以动态的新增或者删除name server节点,无需重启Broker与客户端。
四、RocketMQ中的普通消息的实现
由于顺序消息与普通消息相比,差异并不大,我们通过解析源码的方式先了解下RocketMQ中的普通消息的实现。解析源码的过程略显枯燥,但又是我们深入了解RocketMQ的必经之路。
在RocketMQ的源码包中,给出了普通消费与顺序消息的demo。路径分别为:
普通消息:package org.apache.rocketmq.example.quickstart
顺序消息:package org.apache.rocketmq.example.ordermessage
4.1、Producer相关
4.1.1、Producer的启动
疑问:
1、注册Topic注册的是默认topic,意义是什么?
猜想:
1、在线上环境,一般可能是用RocketMQ提供的mqadmin在不同的broker上创建topic。默认的topic是不被使用的。然而在线下环境,默认topic在自动创建topic的过程中会起到作用。
自动创建topic的动作是发生在send的逻辑里的。这里提前注册默认topic,也是起到初始化的作用。
4.1.2、Producer发送普通消息
创建topic的方式
1、使用mqadmin命令创建
在生产环境中,我们更多的是用mqadmin指令生成topic,可以指定topic所在的broker,每个broker上创建queue的数量等。比较灵活。
2、MQ自动生成topic
发送消息时,client查询本地缓存与name server中是否有topic对应的broker与queue的信息,如果没有。会向name server查询MQ默认创建的topic对应的broker信息。默认topic是在broker的启动过程中创建的。然后将默认topic对应的broker信息作为当前topic信息进行返回。
消息发送到broker后,broker会检查此topic的信息是否在本地缓存中,如果没有,会向name server进行注册。
同时RocketMQ有定时任务的机制来修正topic对应的broker信息。
获取topic对应Broker信息策略
RocketMQ中通过失败重试、失败剔除、定时更新的策略保证了系统的高效。不保证从本地缓存中拿到的Broker信息一定是可用的,如果不可用可以尝试重试和剔除,通过定时任务异步的对信息进行修正。这种策略保证了系统的高效。
Producer负载均衡策略
Producer通过轮询本地缓存的queue数组的方式来做负载均衡。有普通模式和Broker故障延迟机制可以选择,Broker故障延迟机制:如果某Broker发生故障,一段时间内都不会尝试使用该Broker,如果该Broker已恢复,则从剔除列表中将此Broker移除。
消息全局唯一主键uniqKey生成规则
全局唯一主键uniqKey = FIX_STRING(10bit)+currentTime(4bit)+count(2bit) 其中FIX_STRING = ip+pid+classLoader取hashCode。
4.2、Consumer相关
在分析Consumer代码前,我们需要了解一些RocketMQ消费的设计方式。
1、Broker与Consumer之间的消息传送有两种方式:推模式、拉模式
推模式:Broker向Consumer推送消息
拉模式:Consumer主动向Broker拉消息
RocketMQ的推模式是基于拉模式,在拉模式上包装了一层,一个拉取任务完成后开始下一个拉取任务。
2、一个消费者组可以包含多个消费者,每个消费者都可以订阅多个主题。
消费者组的消费模式有:集群模式、广播模式
集群模式:topic下的同一条消息只允许被同一个消费者组下的一个消费者消费。
广播模式:topic下的同一条消息可以被同一个消费者组下的所有消费者消费。
3、消息的负载
一个消息在同一时间只允许被一个消费者消费,一个消费者可以同时消费多个消息。
4.2.1、Consumer的启动
疑问:
获取订阅信息的逻辑中,订阅关系的来源是什么?通过调用什么方法获得?
4.2.2、消息消费
消息的消费相比于消息的发送显得复杂很多,RocketMQ对这个动作进行了拆分。
1、“消息消费”这个概念或许可以拆解为两个概念,“消息拉取”、“消息业务处理”。即:
消息消费 = 消息拉取 + 消息消费处理,这里的“+”号在RocketMQ中是通过回调函数、异步来实现的,实现了消息拉取和消息消费处理的解耦。
2、消息拉取 = 获取拉取目标信息 + 从目标中进行拉取,在获取拉取目标信息中,我们可以实现负载均衡。
带着上面两点总结再来看,就会显得清晰很多。
4.2.3、消息拉取
消息拉取的入口不太直观,在启动过程的最后一步,启动MQClientInstance中的start pull service方法中,进行消息的拉取。
消息拉取的入口为PullMessageService#run()方法,while循环的获取拉取任务,再根据拉取任务拉取消息。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
//volatile修饰,每次都重新检测此值,可通过其他线程修改此值,停止线程。
while (!this.isStopped()) {
try {
//从pullRequestQueue中获取拉取任务,如果获取不到,线程阻塞,直到有任务可被拉取
PullRequest pullRequest = this.pullRequestQueue.take();
//根据拉取任务拉取消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
4.2.4、消息拉取-获取拉取目标信息(Consumer负载均衡)
拉取任务pullRequest是在哪里获取的呢?通过跟踪pullRequestQueue的set方法,发现有两个途径:
1、当根据pullRequest未拉取到消息时,会将pullRequest重新放入到队列中。
2、由RebalancePushImpl维护,在上面的启动流程图中我们了解到这是消费端的负载均衡类。
负载均衡流程图:
4.2.5、消息拉取-从目标中进行拉取
先来看pullMessage方法,主体流程为:
1、拉取请求参数的封装;
2、消息服务器查找并返回消息;
3、客户端处理返回消息。
至此,我们基本了解了普通消费发过程。
五、RocketMQ中的顺序消息的实现
5.1、RocketMQ中的顺序发送
了解完上面的内容,再来了解顺序消息则显得简单点。
发送时,只需要改变顺序消息中【负载均衡,获取Queue信息】这一步,改为【通过指定的select方法获取queue信息】即可保证消息按照我们指定的规则存储到对应的queue上,便于消费时按序消费。
示例代码:package org.apache.rocketmq.example.ordermessage
5.2、RocketMQ中的顺序消费
顺序消费的逻辑则稍显复杂,这也是因为消息的拉取和消息的消费是异步的,同时消息的消费是通过线程池管理的,要想实现顺序消费,则需要从消息的拉取和消息的消费分别进行改造。
拉取时,通过判断锁来保证顺序性。如果消息队列未被锁定,则延迟3s再尝试进行拉取动作。如果锁定了才进行拉取。
消费时才进行加锁,加锁入口org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll。
在执行消费时,还会申请新的锁objLock,保证一个消息队列同一时刻只会有一个线程来进行处理。
(图不画了)
六、总结
通过上面的分析,我们大致对顺序消息的设计,RocketMQ中普通消息的发送、接收,以及RocketMQ中的顺序消息实现方式有了一定的了解。但是还有不少细节没有涉及到,例如消息的确认、消息进度管理在本文中没有提及。在阅读的过程中,我也有一些疑问没能得到解决,期待以后回顾时能够有更多的收货。
RocketMQ路由中心的设计、消息存储的机制、网络通信的过程、主从同步的设计、消息过滤的设计、事务消息等都是值得深入研究的模块,也请小伙伴们期待我的后续文章,我们一起再深入学习RocketMQ。