RcoketMQ源码内部机制和应用场景的分享
•简单说明一下图中箭头含义,从 Broker 开始,Broker Master1 和 Broker Slave1 是主从结构,它们之间会进行数据同步,即 Date Sync。同时每个 Broker 与NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer 中。
•Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master.
•Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息
重要概念
•NameServer: 提供轻量级的服务发现和路由。 每个 NameServer 记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。
•Broker: 消息中转角色,通过提供轻量级的 Topic 和 Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。
•Producer:生产者,产生消息的实例,拥有相同 Producer Group 的 Producer 组成一个集群。
•Consumer:消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的Consumer组成消费组
•ProducerGroup
可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象
producerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,如果Producer中途意外宕机,Broker
会主动回调Producer Group内的任意一台机器来确认事务状态。Producer在启动时,会 选择一个namesrv相连,通过topic关系找到
broker,并和存有topic的所有master broker相连,也就是说,消息只会发到master的broker上去。
•ConsumerGroup
可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象每个Consumer Group会分别将该Topic的消息消费一遍;
在每一 个Consumer Group内,各Consumer通过负载均衡的方式消费该Topic的消息
将一个Consumer Group对应业务系统中的一个独立的业务模块,是一个比较值得推荐的ConsuerGroup划分方法。
•BrokerGroup
和Topic之间的关系是多对多的关系
一个Topic由多个Broker Group提供服务即《RocketMQ用户指南》中提到的多Master,或多Master多Slave模式。
一个Topic由一个Broker Group提供服务即《RocketMQ用户指南》中提到的单Master模式(包含Slave或不包含Slave)。
存储特点
(1)消息主体以及元数据都存储在CommitLog当中
(2)Consume Queue相当于kafka中的partition,是一个逻辑队列(也可以理解为数据字典),存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
(3)每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。
为什么要这样设计?
rocketMQ的设计理念很大程度借鉴了kafka,所以有必要介绍下kafka的存储结构设计:
和RocketMQ类似,每个Topic有多个partition(queue),kafka的每个partition都是一个独立的物理文件,消息直接从里面读写。
根据之前阿里中间件团队的测试,一旦kafka中Topic的partitoin数量过多,队列文件会过多,会给磁盘的IO读写造成很大的压力,造成tps迅速下降。
所以RocketMQ进行了上述这样设计,consumerQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。
没有一种方案是银弹,那么RocketMQ这样处理有什么 优缺点 ?
优点:
队列轻量化,单个队列数据量非常少。对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。
缺点:
写虽然完全是顺序写,但是读却变成了完全的随机读。
读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。
要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度。
底层实现
•1 MappedByteBuffer
•RocketMQ中的文件读写主要就是通过MappedByteBuffer进行操作,来进行文件映射。利用了nio中的FileChannel模型,可以直接将物理文件映射到缓冲区,提高读写速度。
•2 page cache
•通俗的说:pageCache是系统读写磁盘时为了提高性能将部分文件缓存到内存中,下面是详细解释:
•page cache:这里所提及到的page cache,是linux中vfs虚拟文件系统层的cache层,一般pageCache默认是4K大小,它被操作系统的内存管理模块所管理,文件被映射到内存,一般都是被mmap()函数映射上去的。
•mmap()函数会返回一个指针,指向逻辑地址空间中的逻辑地址,逻辑地址通过MMU映射到page cache上。
多 master 多 slave 异步复制模式
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。
Master节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。
优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
多 master 多 slave 同步双写模式
同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
(图中的目录树中,一个0目录就是一个MapedFileQueue,一个commitLog目录也是一个MapedFileQueue,右侧的000000000就是一个MapedFile。)
MapedFile:每个MapedFile对应的就是一个物理二进制文件了,在代码中负责文件读写的就是MapedByteBuffer和fileChannel。相当于对pageCache文件的封装。
•介绍一下各个关键对象的作用:
•DefaultMessageStore:这是存储模块里面最重要的一个类,包含了很多对存储文件的操作API,其他模块对消息实体的操作都是通过DefaultMessageStore进行操作。
•commitLog:commitLog是所有物理消息实体的存放文件,这篇文章的架构图里可以看得到。其中commitLog持有了MapedFileQueue。
•consumeQueue:consumeQueue就对应了相对的每个topic下的一个逻辑队列(rocketMQ中叫queque,kafka的概念里叫partition), 它是一个逻辑队列!存储了消息在commitLog中的offSet。
•indexFile:存储具体消息索引的文件,以一个类似hash桶的数据结构进行索引维护。
•MapedFileQueue:这个对象包含一个MapedFileList,维护了多个mapedFile,升序存储。一个MapedFileQueue针对的就是一个目录下的所有二进制存储文件。理论上无线增长,定期删除过期文件。
什么时候清理物理消息文件
•消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):
•消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
•消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
•磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
•注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作,相比Ext3有非常明显的提升
ConsumeQueue的消息处理
• 生产者发送消息只是把消息主体存储到了物理文件commitLog中,但是并没有把消息处理到consumeQueue文件中,那么到底是哪里存入的?
•任务处理一般都分为两种:
•一种是同步,把消息主体存入到commitLog的同时把消息存入consumeQueue,rocketMQ的早期版本就是这样处理的。
•另一种是异步处理,起一个线程,不停的轮询,将当前的consumeQueue中的offSet和commitLog中的offSet进行对比,将多出来的offSet进行解析,然后put到consumeQueue中的MapedFile中。
•问题:为什么要改同步为异步处理?应该是为了增加发送消息的吞吐量。
RocketMq官方源码结构
rocketmq-broker broker可单独启动
rocketmq-client 提供发送、接受消息的客户端API。rocketmq-common
rocketmq-console-ng rocketmq的监控平台单独启动
rocketmq-example rocketmq用法示例
rocketmq-filtersrv 消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!
rocketmq-namesrv namesrv可单独启动
rocketmq-remoting 基于Netty4的client/server + fastjson序列化 + 自定义二进制协议
rocketmq-srvutil
rocketmq-store 消息、索引存储等
rocketmq-tools 命令行工具
在Eclipse中启动RocketMQ的方法
RocketMq的监控平台
•ConsumeFromWhere
消费者从那个位置消费,分别为:
1 CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
2 CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
3 CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
•重复消费问题
Exactly Only Once
(1). 发送消息阶段,不允许发送重复的消息。
(2). 消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复
RocketMq的三种消息类型
•1.普通消费
不追求时间顺序,只要把生产出来的事件全部消费完就可以。这种可以用并行的方式处理,效率高很多:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
•2. 顺序消费
严格按照时间消费的模式,这种模式需要用串行方式,生产者生产的时候,这时候生产者需要往特定的队列里有序push
实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
return ConsumeOrderlyStatus.CONSUME_SUCCESS;
}
}
•3.事务消费
可以解决如下类似场景的问题:A用户和B用户的账户体系不在同一台服务器上面,现在A用户向B用户转账100元,为了提高执行效率,就采用消息队列的方式实现异步处理。大致逻辑是A用户扣款100元,然后发送消息给消息队列,B用户的程序从队列中获取转账信息并向B用户上账100元
通过消息的异步事务,可以保证本地事务和消息发送同时执行成功或失败,从而保证了数据的最终一致性
rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息。
发送prepare消息复用了普通消息发送,只是给消息增加
rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息。
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
try {
Message msg = new Message("TopicTransactionTest", "transaction", "KEY", ("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.println(sendResult); Thread.sleep(10);
} catch (MQClientException e) {
e.printStackTrace();
}
producer执行本地事务
public class TransactionExecuterImpl implements LocalTransactionExecuter {
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
System.out.println("执行本地事务msg = " + new String(msg.getBody()));
String tags = msg.getTags();
if (tags.equals("transaction2")) {
System.out.println("======我的操作============,失败了 -进行ROLLBACK");
return LocalTransactionState.ROLLBACK_MESSAGE; }
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.UNKNOW;
}
}
}
•3.事务消费
/** * broker回查producer */
public class TransactionCheckListenerImpl implements TransactionCheckListener {
// private AtomicInteger transactionIndex = new AtomicInteger(0);
//在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));
// return LocalTransactionState.ROLLBACK_MESSAGE; r
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.UNKNOW;
}
}
RocketMQ发送消息的三种方式
•1可靠的同步
异步传输通常用于响应时间敏感的业务场景。
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest" TagA",
("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
•2可靠的异步
异步传输通常用于响应时间敏感的业务场景。
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
Message msg = new Message("TopicTest","TagA","OrderID188","Hello
world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
producer.shutdown();
}
•3单向传输
应用:单向传输用于要求中等可靠性的情况,如日志采集。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest, "TagA" ("Hello
RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
producer.shutdown();
}
发送的一些其他说明
默认发送超时为3s。
消息超过4k,即启用消息的压缩。
发送失败,默认重发2次。
消息最大限制为4M,即超过4M会提示发送失败。