欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RcoketMQ源码内部机制和应用场景的分享

程序员文章站 2022-03-23 12:54:07
...

        RcoketMQ源码内部机制和应用场景的分享

简单说明一下图中箭头含义,从 Broker 开始,Broker Master1 Broker Slave1 是主从结构,它们之间会进行数据同步,即 Date Sync。同时每个 Broker NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer

Producer NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master.

Consumer NameServer 集群中的其中一个节点(随机选择)建立长连接,同时和提供 Topic 服务的 Master Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息

 

 

重要概念

NameServer: 提供轻量级的服务发现和路由。 每个 NameServer 记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。

Broker: 消息中转角色,通过提供轻量级的 Topic Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。

Producer:生产者,产生消息的实例,拥有相同 Producer Group Producer 组成一个集群。

Consumer消费者,接收消息进行消费的实例,拥有相同 Consumer Group Consumer组成消费组

ProducerGroup

可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象

producerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,如果Producer中途意外宕机,Broker

主动回调Producer Group内的任意一台机器来确认事务状态。Producer在启动时,会    选择一个namesrv相连,通过topic关系找到

broker,并和存有topic的所有master broker相连,也就是说,消息只会发到masterbroker上去。

 

ConsumerGroup
 

可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象每个Consumer Group会分别将该Topic的消息消费一遍

每一   个Consumer Group内,各Consumer通过负载均衡的方式消费该Topic的消息

将一个Consumer Group对应业务系统中的一个独立的业务模块,是一个比较值得推荐的ConsuerGroup划分方法。

 

BrokerGroup

 Topic之间的关系是多对多的关系

一个Topic由多个Broker Group提供服务即RocketMQ用户指南中提到的多Master,或多MasterSlave模式。

Topic由一个Broker Group提供服务即RocketMQ用户指南中提到的单Master模式(包含Slave或不包含Slave)。

 

存储特点

(1)消息主体以及元数据都存储在CommitLog当中

(2)Consume Queue相当于kafka中的partition,是一个逻辑队列(也可以理解为数据字典)存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。

(3)每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。

 

为什么要这样设计?

rocketMQ的设计理念很大程度借鉴了kafka,所以有必要介绍下kafka的存储结构设计:

RcoketMQ源码内部机制和应用场景的分享

 

 

RocketMQ类似,每个Topic有多个partition(queue),kafka的每个partition都是一个独立的物理文件,消息直接从里面读写。

根据之前阿里中间件团队的测试,一旦kafkaTopicpartitoin数量过多,队列文件会过多,会给磁盘的IO读写造成很大的压力,造成tps迅速下降。

所以RocketMQ进行了上述这样设计,consumerQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。

 

 

没有一种方案是银弹,那么RocketMQ这样处理有什么 优缺点 ?

优点:

队列轻量化,单个队列数据量非常少。对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。

缺点:

写虽然完全是顺序写,但是读却变成了完全的随机读。

读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。

要保证CommitLogConsumeQueue完全的一致,增加了编程的复杂度。

 

底层实现

1 MappedByteBuffer

RocketMQ中的文件读写主要就是通过MappedByteBuffer进行操作,来进行文件映射。利用了nio中的FileChannel模型,可以直接将物理文件映射到缓冲区,提高读写速度

 

2 page cache

通俗的说:pageCache是系统读写磁盘时为了提高性能将部分文件缓存到内存中,下面是详细解释

page cache:这里所提及到的page cache,是linuxvfs虚拟文件系统层的cache层,一般pageCache默认是4K大小,它被操作系统的内存管理模块所管理,文件被映射到内存,一般都是被mmap()函数映射上去的。

mmap()函数会返回一个指针,指向逻辑地址空间中的逻辑地址,逻辑地址通过MMU映射到page cache上。

 

 

master slave 异步复制模式

在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave

Master节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。

优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。

缺点:使用异步复制的同步方式有可能会有消息丢失的问题。

RcoketMQ源码内部机制和应用场景的分享

 

master slave 同步双写模式

同多 master slave 异步复制模式类似,区别在于 master slave 之间的数据同步方式。

优点:同步双写的同步模式能保证数据不丢失。

缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。

RcoketMQ源码内部机制和应用场景的分享

 

                             RcoketMQ源码内部机制和应用场景的分享

(图中的目录树中,一个0目录就是一个MapedFileQueue,一个commitLog目录也是一个MapedFileQueue,右侧的000000000就是一个MapedFile)

MapedFile:每个MapedFile对应的就是一个物理二进制文件了,在代码中负责文件读写的就是MapedByteBufferfileChannel。相当于对pageCache文件的封装。

 

 

介绍一下各个关键对象的作用:

DefaultMessageStore:这是存储模块里面最重要的一个类,包含了很多对存储文件的操作API,其他模块对消息实体的操作都是通过DefaultMessageStore进行操作。

commitLog:commitLog是所有物理消息实体的存放文件,这篇文章的架构图里可以看得到。其中commitLog持有了MapedFileQueue

consumeQueue:consumeQueue就对应了相对的每个topic下的一个逻辑队列(rocketMQ中叫quequekafka的概念里叫partition它是一个逻辑队列!存储了消息在commitLog中的offSet

indexFile:存储具体消息索引的文件,以一个类似hash桶的数据结构进行索引维护。

MapedFileQueue:这个对象包含一个MapedFileList,维护了多个mapedFile,升序存储。一个MapedFileQueue针对的就是一个目录下的所有二进制存储文件。理论上无线增长,定期删除过期文件。

 

什么时候清理物理消息文件

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):

消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。

消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。

磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作,相比Ext3有非常明显的提升

 

ConsumeQueue的消息处理

• 生产者发送消息只是把消息主体存储到了物理文件commitLog,但是并没有把消息处理到consumeQueue文件中,那么到底是哪里存入的?

任务处理一般都分为两种:

一种是同步,把消息主体存入到commitLog的同时把消息存入consumeQueuerocketMQ的早期版本就是这样处理的。

另一种是异步处理,起一个线程,不停的轮询,将当前的consumeQueue中的offSetcommitLog中的offSet进行对比,将多出来的offSet进行解析,然后putconsumeQueue中的MapedFile中。

问题:为什么要改同步为异步处理?应该是为了增加发送消息的吞吐量。

 

RocketMq官方源码结构

                                               RcoketMQ源码内部机制和应用场景的分享

 

rocketmq-broker                broker可单独启动

rocketmq-client                  提供发送、接受消息的客户端APIrocketmq-common

rocketmq-console-ng         rocketmq的监控平台单独启动  

rocketmq-example             rocketmq用法示例

rocketmq-filtersrv              消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ

rocketmq-namesrv             namesrv单独启动

rocketmq-remoting            基于Netty4client/server + fastjson序列化 + 自定义二进制协议

rocketmq-srvutil

rocketmq-store                    消息、索引存储等

rocketmq-tools                    命令行工具

 

Eclipse中启动RocketMQ的方法

RcoketMQ源码内部机制和应用场景的分享

 

RocketMq的监控平台

RcoketMQ源码内部机制和应用场景的分享

 

ConsumeFromWhere 

消费者从那个位置消费,分别为:

 
1 CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 
2 CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 
3 CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 

以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

 

重复消费问题

Exactly Only Once

(1). 发送消息阶段,不允许发送重复的消息。

(2). 消费消息阶段,不允许消费重复的消息

只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复

 

RocketMq种消息类型

1.普通消费

不追求时间顺序,只要把生产出来的事件全部消费完就可以。这种可以用并行的方式处理,效率高很多: 

 consumer.registerMessageListener(new MessageListenerConcurrently() {

           @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

                    ConsumeConcurrentlyContext context) {

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

             }

     }

2. 顺序消费

严格按照时间消费的模式,这种模式需要用串行方式,生产者生产的时候,这时候生产者需要往特定的队列里有序push

实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列

consumer.registerMessageListener(new MessageListenerOrderly() {

          @Override

            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,

                    ConsumeOrderlyContext context) {

                    return ConsumeOrderlyStatus.CONSUME_SUCCESS;

             }

}

3.事务消费

可以解决如下类似场景的问题:A用户和B用户的账户体系不在同一台服务器上面,现在A用户向B用户转账100元,为了提高执行效率,就采用消息队列的方式实现异步处理。大致逻辑是A用户扣款100元,然后发送消息给消息队列,B用户的程序从队列中获取转账信息并向B用户上账100

通过消息的异步事务,可以保证本地事务消息发送同时执行成功或失败,从而保证了数据的最终一致性

  

rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息

 

发送prepare消息复用了普通消息发送,只是给消息增加

                                 RcoketMQ源码内部机制和应用场景的分享

 

rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息。

producer.setTransactionCheckListener(transactionCheckListener);

producer.start();

TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

try {

Message msg = new Message("TopicTransactionTest", "transaction", "KEY", ("Hello RocketMQ ").getBytes());

SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

System.out.println(sendResult); Thread.sleep(10);

 } catch (MQClientException e) {

            e.printStackTrace();

}

 

producer执行本地事务

public class TransactionExecuterImpl implements LocalTransactionExecuter {

public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

System.out.println("执行本地事务msg = " + new String(msg.getBody()));

String tags = msg.getTags();

if (tags.equals("transaction2")) {

System.out.println("======我的操作============,失败了 -进行ROLLBACK");

 return LocalTransactionState.ROLLBACK_MESSAGE; }

 return LocalTransactionState.COMMIT_MESSAGE;

// return LocalTransactionState.UNKNOW;

}

}

}

 

 

3.事务消费

/** * broker回查producer */

public class TransactionCheckListenerImpl implements TransactionCheckListener {

// private AtomicInteger transactionIndex = new AtomicInteger(0);

//在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。

public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));

// return LocalTransactionState.ROLLBACK_MESSAGE; r

return LocalTransactionState.COMMIT_MESSAGE;

// return LocalTransactionState.UNKNOW;

}

}

RocketMQ发送消息的三方式

1可靠的同步

异步传输通常用于响应时间敏感的业务场景。

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new

DefaultMQProducer("please_rename_unique_group_name");

producer.start();

for (int i = 0; i < 100; i++) {

Message msg = new Message("TopicTest" TagA",

("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);

}

producer.shutdown();

}

 

 

2可靠异步

    异步传输通常用于响应时间敏感的业务场景。

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");

producer.start();

producer.setRetryTimesWhenSendAsyncFailed(0);

for (int i = 0; i < 100; i++) {
    
  final int index = i;

  Message msg = new Message("TopicTest","TagA","OrderID188","Hello 
                world".getBytes(RemotingHelper.DEFAULT_CHARSET));

  producer.send(msg, new SendCallback() {

     @Override

     public void onSuccess(SendResult sendResult) {

           System.out.printf("%-10d OK %s %n", index,

           sendResult.getMsgId());
 
     }

    @Override

    public void onException(Throwable e) {

       System.out.printf("%-10d Exception %s %n", index, e);

       e.printStackTrace();

   }

  });

 }

 producer.shutdown();

}

 

 

3单向传输

     应用:单向传输用于要求中等可靠性的情况,如日志采集。

public class OnewayProducer {

public static void main(String[] args) throws Exception{



  DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");

  producer.start();

  for (int i = 0; i < 100; i++) {

    Message msg = new Message("TopicTest, "TagA" ("Hello 
     RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));

     producer.sendOneway(msg);

  }

  producer.shutdown();

}

 

发送的一些其他说明 
 默认发送超时为
3s

消息超过4k,即启用消息的压缩。

发送失败,默认重发2次。

消息最大限制为4M,即超过4M会提示发送失败。

相关标签: rocketmq