欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ中的顺序消息

程序员文章站 2022-07-14 23:42:35
...

我们在项目开发过程中,有需要使用RocketMQ顺序消息的场景,该如何使用呢?顺序消息的原理是怎样的呢?本文进行了一些探讨。

一、顺序消息的定义

顺序消息(FIFO:First Input First Output)是一种严格按照顺序进行发布和消费的消息类型。要求消息的发布和消息消费都按照顺序进行。

二、顺序消息的设计

在探讨RocketMQ中的普通消息的实现之前,我们有必要了解一下顺序消息的设计。下面分几种场景进行讨论。

1、发布者、MQ、消费者均为单点时的设计
RocketMQ中的顺序消息
1)Producer发送消息M1至MQ,MQ收到消息后进行返回,Producer再发送M2。

2)Consumer消费M1,消费完进行返回。Consumer消费M2。

基于这种设计,Producer的顺序发送很好实现,Consumer的顺序消费需要考虑M1消费失败、或者返回超时的情况。

消费失败的处理:消费失败意味着收到了明确的失败编码,对于失败的处理可能有多种策略,例如直接抛弃,但是进行记录后人工干预;重试一定的次数,仍然失败则进行记录;重试一定的时间,仍然失败则进行记录等。

返回超时的处理:超时意味着消息可能被正常消费,也可能未被消费。可能的策略有:重试一定的次数或时间,仍然超时则进行记录。但这可能引入新的问题,消息的重复消费,从MQ层面上怎么解?

2、发布者集群、MQ单点、消费者集群时的设计
RocketMQ中的顺序消息
1)Producer1发送消息M1至MQ,MQ收到消息后进行返回,返回时是否需要通知每个Producer呢?我们认为可以只返回给Producer1,由调用方逻辑控制再发送M2。

2)Consumer1消费M1,消费完进行返回。Consumer2消费M2。

关于消费失败或者返回超时的情况,同单点时的设计。

我们观察到Producer的单点部署和集群部署对我们分析问题不会产生太大的干扰,消息发送的顺序性是可以由调用方逻辑控制的,基于此,我们再来考虑下面的情况。

3、发布者集群、MQ集群、消费者集群时的设计
RocketMQ中的顺序消息
1)Producer发送消息M1至MQ,MQ收到消息后进行返回,Producer再发送M2。

2)Consumer消费M1,消费完进行返回,需要同时返回给多个MQ Server,都成功后再消费M2。

这个模型存在这样的问题:如何保证多个MQ Server都收到成功消息?一条MQ的消费需要多个应答,是否合理?考虑到消费失败和消费超时,情况变得愈加复杂。

当MQ变为集群后,顺序消息的设计变得复杂?我们该如何解决呢?当我们直面这个问题时,我们可能需要一个复杂的设计才能解决他,如果在顺序消费的场景,MQ退化为单点,是不是一个更好的方案呢?
RocketMQ中的顺序消息
生产者通过一定的发送方式,保证需要被顺序消费的消息只会被发送到某台MQ Server上,消费者采用类似的方式,只从这台MQ上进行消费,同时考虑到失败和超时的场景,我们的设计方案如上图。

三、测试环境搭建

我们在设计层面上对顺序消息有了一定的了解,下面我们将了解RocketMQ中的实现过程,先看下测试环境搭建的过程。

1、从官网fork代码,并导入到IDEA中。官网:https://rocketmq.apache.org/

2、新增环境变量:ROCKETMQ_HOME=rocketMQ运行目录

3、rocketMQ运行目录下新建目录conf、logs、store

4、复制并修改配置文件:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml、logback_namesrv.xml文件复制到conf目录中。修改broker.conf。日志文件也可以调整到指定目录。

5、启动name server

6、修改broker启动参数,加上Program arguments= -c D:\idea_rocketmq\rocketmq\conf\broker.conf

7、启动broker:org.apache.rocketmq.broker.BrokerStartup

8、修改producer文件,指定nameServ,启动(example目录下有样例)

9、修改consumer文件,指定nameServ,启动(example目录下有样例)

注:nameServ可以不在显示指定,RocketMQ默认会从http://jmenv.tbsite.net:8080/rocketmq/nsaddr中读取name server的地址,其中jmenv.tbsite.net与nsaddr均可以通过修改配置文件,或者java选项的方式进行替换。通过这种方式指定name server的好处是,可以动态的新增或者删除name server节点,无需重启Broker与客户端。

四、RocketMQ中的普通消息的实现

由于顺序消息与普通消息相比,差异并不大,我们通过解析源码的方式先了解下RocketMQ中的普通消息的实现。解析源码的过程略显枯燥,但又是我们深入了解RocketMQ的必经之路。

在RocketMQ的源码包中,给出了普通消费与顺序消息的demo。路径分别为:

普通消息:package org.apache.rocketmq.example.quickstart

顺序消息:package org.apache.rocketmq.example.ordermessage

4.1、Producer相关

4.1.1、Producer的启动
RocketMQ中的顺序消息
疑问:

1、注册Topic注册的是默认topic,意义是什么?

猜想:

1、在线上环境,一般可能是用RocketMQ提供的mqadmin在不同的broker上创建topic。默认的topic是不被使用的。然而在线下环境,默认topic在自动创建topic的过程中会起到作用。

自动创建topic的动作是发生在send的逻辑里的。这里提前注册默认topic,也是起到初始化的作用。

4.1.2、Producer发送普通消息
RocketMQ中的顺序消息
创建topic的方式

1、使用mqadmin命令创建

在生产环境中,我们更多的是用mqadmin指令生成topic,可以指定topic所在的broker,每个broker上创建queue的数量等。比较灵活。

2、MQ自动生成topic

发送消息时,client查询本地缓存与name server中是否有topic对应的broker与queue的信息,如果没有。会向name server查询MQ默认创建的topic对应的broker信息。默认topic是在broker的启动过程中创建的。然后将默认topic对应的broker信息作为当前topic信息进行返回。

消息发送到broker后,broker会检查此topic的信息是否在本地缓存中,如果没有,会向name server进行注册。

同时RocketMQ有定时任务的机制来修正topic对应的broker信息。

获取topic对应Broker信息策略

RocketMQ中通过失败重试、失败剔除、定时更新的策略保证了系统的高效。不保证从本地缓存中拿到的Broker信息一定是可用的,如果不可用可以尝试重试和剔除,通过定时任务异步的对信息进行修正。这种策略保证了系统的高效。

Producer负载均衡策略

Producer通过轮询本地缓存的queue数组的方式来做负载均衡。有普通模式和Broker故障延迟机制可以选择,Broker故障延迟机制:如果某Broker发生故障,一段时间内都不会尝试使用该Broker,如果该Broker已恢复,则从剔除列表中将此Broker移除。

消息全局唯一主键uniqKey生成规则

全局唯一主键uniqKey = FIX_STRING(10bit)+currentTime(4bit)+count(2bit) 其中FIX_STRING = ip+pid+classLoader取hashCode。

4.2、Consumer相关

在分析Consumer代码前,我们需要了解一些RocketMQ消费的设计方式。

1、Broker与Consumer之间的消息传送有两种方式:推模式、拉模式

推模式:Broker向Consumer推送消息

拉模式:Consumer主动向Broker拉消息

RocketMQ的推模式是基于拉模式,在拉模式上包装了一层,一个拉取任务完成后开始下一个拉取任务。

2、一个消费者组可以包含多个消费者,每个消费者都可以订阅多个主题。

消费者组的消费模式有:集群模式、广播模式

集群模式:topic下的同一条消息只允许被同一个消费者组下的一个消费者消费。

广播模式:topic下的同一条消息可以被同一个消费者组下的所有消费者消费。

3、消息的负载

一个消息在同一时间只允许被一个消费者消费,一个消费者可以同时消费多个消息。

4.2.1、Consumer的启动
RocketMQ中的顺序消息
疑问:

获取订阅信息的逻辑中,订阅关系的来源是什么?通过调用什么方法获得?

4.2.2、消息消费

消息的消费相比于消息的发送显得复杂很多,RocketMQ对这个动作进行了拆分。

1、“消息消费”这个概念或许可以拆解为两个概念,“消息拉取”、“消息业务处理”。即:

消息消费 = 消息拉取 + 消息消费处理,这里的“+”号在RocketMQ中是通过回调函数、异步来实现的,实现了消息拉取和消息消费处理的解耦。

2、消息拉取 = 获取拉取目标信息 + 从目标中进行拉取,在获取拉取目标信息中,我们可以实现负载均衡。

带着上面两点总结再来看,就会显得清晰很多。

RocketMQ中的顺序消息
4.2.3、消息拉取

消息拉取的入口不太直观,在启动过程的最后一步,启动MQClientInstance中的start pull service方法中,进行消息的拉取。

消息拉取的入口为PullMessageService#run()方法,while循环的获取拉取任务,再根据拉取任务拉取消息。

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    //volatile修饰,每次都重新检测此值,可通过其他线程修改此值,停止线程。
    while (!this.isStopped()) {
        try {
            //从pullRequestQueue中获取拉取任务,如果获取不到,线程阻塞,直到有任务可被拉取
            PullRequest pullRequest = this.pullRequestQueue.take();
            //根据拉取任务拉取消息
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

4.2.4、消息拉取-获取拉取目标信息(Consumer负载均衡)

拉取任务pullRequest是在哪里获取的呢?通过跟踪pullRequestQueue的set方法,发现有两个途径:

1、当根据pullRequest未拉取到消息时,会将pullRequest重新放入到队列中。

2、由RebalancePushImpl维护,在上面的启动流程图中我们了解到这是消费端的负载均衡类。
RocketMQ中的顺序消息
负载均衡流程图:
RocketMQ中的顺序消息
4.2.5、消息拉取-从目标中进行拉取

先来看pullMessage方法,主体流程为:

1、拉取请求参数的封装;

2、消息服务器查找并返回消息;

3、客户端处理返回消息。
RocketMQ中的顺序消息
至此,我们基本了解了普通消费发过程。

五、RocketMQ中的顺序消息的实现

5.1、RocketMQ中的顺序发送

了解完上面的内容,再来了解顺序消息则显得简单点。

发送时,只需要改变顺序消息中【负载均衡,获取Queue信息】这一步,改为【通过指定的select方法获取queue信息】即可保证消息按照我们指定的规则存储到对应的queue上,便于消费时按序消费。

示例代码:package org.apache.rocketmq.example.ordermessage

5.2、RocketMQ中的顺序消费

顺序消费的逻辑则稍显复杂,这也是因为消息的拉取和消息的消费是异步的,同时消息的消费是通过线程池管理的,要想实现顺序消费,则需要从消息的拉取和消息的消费分别进行改造。

拉取时,通过判断锁来保证顺序性。如果消息队列未被锁定,则延迟3s再尝试进行拉取动作。如果锁定了才进行拉取。

消费时才进行加锁,加锁入口org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll。

在执行消费时,还会申请新的锁objLock,保证一个消息队列同一时刻只会有一个线程来进行处理。

(图不画了)

六、总结

通过上面的分析,我们大致对顺序消息的设计,RocketMQ中普通消息的发送、接收,以及RocketMQ中的顺序消息实现方式有了一定的了解。但是还有不少细节没有涉及到,例如消息的确认、消息进度管理在本文中没有提及。在阅读的过程中,我也有一些疑问没能得到解决,期待以后回顾时能够有更多的收货。

RocketMQ路由中心的设计、消息存储的机制、网络通信的过程、主从同步的设计、消息过滤的设计、事务消息等都是值得深入研究的模块,也请小伙伴们期待我的后续文章,我们一起再深入学习RocketMQ。