欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketmq源码分析、整体结构与类关系设计的思考

程序员文章站 2022-06-24 17:57:32
...
前言:

    本文目的:

       
  • 一个复杂的中间件是如何从整体上分模块以及设计核心类之间关系
  •    
  • 一些重要场景的设计分析与设计模式使用
  •    
  • 有哪些抽取的公共模块,公共底层的东西及如何重用


一、简介:

    RocketMQ是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,实现了业务消峰、分布式事务的优秀框架。
    产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
    本文通过研究这样一个复杂中间件的代码,复盘一个复杂系统的整体设计思路与技巧。网上看单独分析其中部分技术的文章不少,但如果自己从头设计,怎么拆分模块,怎么选择方案?因为时间仓促且能力有限,如有不正确的欢迎指正。
    本文基于4.5.1版本!

二、特点:

2.1 真正分布式系统
    一般的服务都经历这样的过程:
    单机->主从架构->高可用主从架构(自动选主)->分片并备份的分布式集群
    rocketmq是真正的分布式消息队列中间件。

2.2 速度快
    说是参考了kafka,比如顺序写消息到文件,在写消息同时另外构建轻量的队列与消息索引。而有的系统因为对数据要进行复杂处理,所以要单独写blog之类的文件,这里就不需要了。

2.3 局部主从结构
    与kafka不同,这里的broker不是完全对等的,分为多个主broker,每个broker带多个从broker,形成一个小团体,只同步对应的主broker的数据。与redis-cluster类似,主从小组合,组合内部自己切换,降低了复杂性。但elasticsearch的分片又与kafka相似。

2.4 简化的命名服务器集群
    分布式系统一定要有一个配置命名服务,这里没有用zookeeper,也没用其它类似的现成产品,而是相对简化的命名服务。


三、主要结构

3.1 系统结构

rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

1) Name Server
    Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

2) Broker
    Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

    每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

3) Producer
    Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

    Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

    Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4) Consumer
   Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

3.2 源码结构

   源码中的maven模块按如下划分。其中client包含了consumer和producer,而broker中的存贮功能由于相对复杂又独立成模块,remoting是基于netty的公共通讯模块,整体上还是很清晰的。

rocketmq-broker:服务端,接受消息,存储消息,consumer拉取消息

rocketmq-client:消息发送和接收,包含consumer和producer

rocketmq-common:通用的枚举、基类方法、或者数据结构,包名有admin、consumer、filter、hook、message

rocketmq-distribution:脚本、配置模块,RocketMQ编译时,bin目录,benchmark目录,conf目录都是从这个模块编译输出的

rocketmq-example:示例模块

rocketmq-filter:消息过滤器

rocketmq-logappender:日志

rocketmq-logging:日志

rocketmq-namesrv:NameServer,类似服务注册中心,broker在这里注册,consumer和producer在这里找到broker地址

rocketmq-openmessaging:RocketMQ支持openmessaging,详见:http://openmessaging.cloud

rocketmq-remoting:使用netty的客户端、服务端,使用fastjson序列化,自定义二进制协议

rocketmq-srvutil:只有一个ServerUtil类,只提供Server程序依赖,尽可能减少客户端依赖

rocketmq-store:消息存储,索引,consumerLog,commitLog等

rocketmq-tools:命令行工具


四、类关系与设计分析

    从这个总图正式开局,后面开始正式分析。

rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

    核心业务模块三个,初看内部的核心类很有层次感,使用通用的底层通讯模块,下面重点介绍client与broker的类关系图与思考。

4.1. client模块

rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

此图分析:
    1)从MQClientManager到MQClientInstance 再到DefaultMQProducerImpl DefaultMQPullConsumer有比较明确的包含关系。因为业务上可能一个应用要对接多个MQ,对一个MQ可能生产多种消息,也可能消费消息。
    2)MQClientAPIImpl统一处理与外界的远程调用的功能API,把客户端公共数据,发送者,消费者的请求都放在这里,同时这里还负责处理外部对客户端的请求的处理。为何这些API不分成三个类呢?可能不需要太细吧,也许API中有些公共的东西。
    3)为何clientRemotingProcessor由MQClientInstance产生,并传递给MQClientAPIImpl,再传递给remotingClient?remotingClient是另外的底层公共通讯模块,各自己的处理当然是传过来的用的。但为啥不让MQClientAPIImpl产生而要传过来?也许MQClientAPIImpl使用时,使用MQClientInstance中的数据或工具。
常见的上级对象产生下级使用的对象时,会把this传进去,实现上下级相互直接引用。MQClientInstance产生MQClientAPIImpl时不传自己this,但是在产生(new)处理器时有new ClientRemotingProcessor(this),而产生new PullMessageService(this) new RebalanceService(this)也是这样。
    说明引用关系不宜过多过乱,最好有一个引用核心。MQ客户端调用自己的API接口合理,但API接口反调用MQ客户端直觉上没必要,API要的处理器,MQ客户端给它,处理器中会用到客户端。说明API也是会用到MQ客户端,但就是不直接引用。这样符合职责规范。有点像上级安排下级任务,派个钦即可,下级有事不用汇报上级,调用钦差即可,职责明确。
    4)说到引用核心,MQClientInstance是整个MQ客户端的核心。当DefaultMQProducerImpl发消息时,是this.mQClientFactory.getMQClientAPIImpl().sendMessage(…),当生成topic时,是this.mQClientFactory.getMQAdminImpl().createTopic(…)。当DefaultMQPullConsumer拉消息时,是this.mQClientFactory.getMQClientAPIImpl().pullMessage(…)。都是通过mQClientFactory(MQClientInstance)找到API类,而它们都不直接引用API类操作。也是把引用关系按职责梳理清晰。清晰的引用关系说明开发人员头脑清楚,否则改动维护就会是一团乱麻。我看到过类职责不清,想用哪个类功能就直接引用注入,不考虑整体上的规划,不考虑是否通过核心类使用那个类。

设计细节:
    1)异步发送线程池设计。队列要有最大数量限制,不可*asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000)。new ThreadPoolExecutor的参数要配置好,特别是高并发主业务的线程数与Runtime.getRuntime().availableProcessors()处理器数相关。线程池中线程工厂中要命名new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
    2)remotingClient是通用的底层rpc客户端。既然是通用的,具体业务处理逻辑由上层提供。RemotingClient.registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor)中还可以提供线程池。不提供就用内部的publicExecutor。通用层的任务就是产生一个runnable任务,具体是根据req.code找到处理器来处理。这个runnable对象扔了到传过来的线程池来处理。这样对不同的任务,由上层决定线程池的个性化配置。
    3)Hook设计。doBeforeRpcHooks/ doAfterRpcHooks futurn与callback是常用的,但hook还是用的少。我理解在一个调用过程中,特别是重要的调用,要考虑周全的话,是要可以在正式处理的前后,插入额外的处理过程,以实现最大的灵活性。或者把非主流流程放进去,主线更清晰。这总方式类比如servlet里的filter,spring 中的interceptor,在springContext构建的bean每一个环节,比如bean产生前后,属性设置前后,init与dispose时,都可以插入一些灵活的个性的东西。具体是什么交给外部,自己要实现这个机制。
    具体实现比如:SendMessageHook的实现类SendMessageTraceHookImpl,在发送消息前后,在发送消息上下文中加入了TraceContext。又比如RPCHook的实现类AclClientRPCHook,在doBeforeRequest中会额外加入控制信息。


4.2. broker模块

rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

此图分析:
    1) 在broker模块,BrokerController是其中的核心类,BrokerController引用很多类,从功能上的复杂度上看,DefaultMessageStore又是BrokerController下一级的核心类,commitLog又是DefaultMessageStore下一级的核心类。核心类之间围绕着众多的其它功能类为其服务。对外通讯的nettyClient与nettyServer都由BrokerController所引用的类负责。整体上层次感非常清晰,核心类周围的从类众星捧月一般,但从类之间不相互引用,如果需要都通过核心类引用来操作。
    2) 关于数据存贮与高可用,有一个Dledger实现了所谓的openmessagingAPI,内部是。BrokerController根据配置的其中isEnableDLegerCommitLog用来判断是否使用DLeger,默认false是关闭的。在默认情况下使用CommitLog + HAService。使用Dleger时,因为都交给它处理数据记录与选举leader了,但上层还要对其内部的变化感知并处理,比如上报主从变化 给nanmeServer,所以有DledgerRoleChangeHandler类配置给Dleger,等于是监听者,它最重要的引用就是核心的BrokerController。监听者内部得到信息后,产生一个runnable对象,放在自己线程池中处理。
    3) 为了保证各种类型的消息快速存贮,消息本身使用顺序编号的文件,始终加在文件尾部,超过设定后另建新文件存。而另有一个服务reputMessageService不断循环对消息建消费队列,以及建消息索引。都是产生每一个dispatchRequest,由dispatcherList中的建队列建索引的dispatcher依次处理,这种结构下你也很方便在当中增加自己的dispatcher,如果处理耗时,还可以用线程也来处理。
    4) HAService是一主多从broker之间的高可用,看代码貌似只能实现数据的复制,因为updateHaMasterAddress这个messageStore接口的方法会调用HAService.updateHaMasterAddress,但外部没有调用者,如果有,就应该是由一个实现选主协议的功能调用。这个HAService包含一个AcceptSocketService,HAClient与GroupTransferService,这三个都是前面提到的一个通用的抽象类ServiceThread的实现。每个broker即是serverSocket,管理过来的连接,也是SocketChannel去连接其它broker。
    5) 具体的消息顺序写入commitLog见后一张图。

设计细节:
    1)同样的所有线程池的核心线程与最大线程数都是一样,都是配置的。线程中的队列都是设置好容量的LinkedBlockingQueue,所有的线程池都要命名。
    2)ServiceThread是一个非常常用的的抽象类,实现也很多基础的功能,比如启停,等待等等。值得复用。
    3)Mappedfile记录数据到硬盘,也是值得参考的功能,特别是已经封闭到mappedFileQueue中使用。入盘是非常底层的功能了,越是底层越通用。



4.3. namesvr模块
    (略,有空再学习补充)


4.4 文件存盘与消费队列与索引的建立
  
    这方面的文章很多,也很细,很多byteBuffer操作,主要结构与关系见下图。
注意:

       
  • 最底层对应文件的是MappedFile,对应物理文件与相关的属性信息。
  •    
  • 一个broker上有一个commitLog,包含一个MappedFileQueue,其中又包含了MappedFile组成的一个有queue特点的CopyOnWriteArrayList。
  •    
  • 一个broker上的多个IndexFile其实是一个ArrayList里存放的,一个IndexFile包含一个MappedFile。
  •    
  • 一个broker上的多个ConsumeQueue其实是存放在一个map中(从一个topic中找到它下面的这个map),每个ConsumeQueue包含一个MappedFileQueue。


rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

CommitLog的追加数据分析:
    当从commitLog通过haservice加数据是appendData(long, byte[])比较简单,因为只要和主机一致就行了。
    当主commitLog加消息数据时是putMessage(MessageExtBrokerInner),里面有一句:mappedFile.appendMessage(msg, this.appendMessageCallback),使用了callback。当安排下级办一件事时,过程中还要上级来协调/记录一些事情时,通常派一个内部类给下级,下级使用内部类,而内部类天生就指向了包含它的外部类。当然上级也可以把this传给一个普通类并派下去作为回调,比如DledgerRoleChangeHandler就是这样,不过给外部用的话,又有点监听的意思了,不过用法是相似的。

4.5 HaService分析

   这里选择messagestore的其中一个服务HaService,分析其设计

rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

    1)功能:高可用服务,主要是实现主broker与备broker之间的数据同步的。
    2)使用位置:
Ha是属于defaultMessageStore的,与commitLog平级,但当commitLog在进行putMessages(MessageExtBatch)时,会通过defaultMessageStore使用ha来保证消息除了在主broker上存好,也要在备broker上存好,同步双写。
    3)设计简述
Ha既然属于defaultMessageStore,调用存贮功能的外部模块并不关心。平时的功能是主备broker上的数据同步。如果一个备用broker新上线,就会同步到主broker上的数据,与当前是否在处理新消息无关。
    4)类的设计:
    Ha要通过socket进行数据传输,服务端必然有一个serverSocketChannel接收连接,还有产生的与每一个客户端的socketChannel来进行数据读写,肯定要放在一个列表窗口中。客户端只需要一个连接后产生的socketChannel进行读写就可以了。

    服务端类:
    肯定有serverSocketChannel,而且必然在一个循环中处理客户端的连接,可以包装成一个服务。
    连接后产生的每一个socketChannel,但它只能用来读写数据,然而每一个客户端总有个性化的属性数据,所以每一个socketChannel都要封装起来成一个HAConnection,放在一个List容器中。
    每个HAConnection肯定要记录各自对接的客户端的offset。对其中的socketChannel进行读,读客户端的请求要多少数据。对其中的写肯定是把数据发过去。读写都是不断循环的服务。
    同步双写时,数据要保证也写到的从端,

    客户端类:
    肯定有一个连接产生socketChannel,并用它来进行读写数据。它需要定时向服务端请求数据,告诉自己目前数据到哪了。所以也是一个服务,都是循环进行了。
    最后,把服务端类与客户端类合并在一起,产生HaService,进行优化。

    5)设计感悟:
    参数不要碎片化:getHaSendHeartbeatInterval这个ha的参数,并没有从外部配置给haservice,而用的时候还是通过找到messageStore这个核心类,找到配置类中的值,说明参数不要碎片化传递,或者根本就不要传递,保留一个总引用即可。
    大过程分成小过程:主broker上写消息后,需要同时写到从broker上,很可能我们把这个包装在一个过程中。而实际上,写消息到主broker后,haservice进行不停的同步不用管,只需要一定时间内检查ha写从broker的offset是不是上已经>=这个消息时的offset了,就可以了,这个检查过程循环5次,第次最多等1秒,同步心跳也是5秒。这样一个大的任务,拆分成了小任务,写主并检查同步点的,主从同步的,相互不影响,只通过属性值进行判断。
    小过程之间的关联:这个检查从库offset的请求任务,自带countDownlatch,双写过程发出这个请求任务给haService的服务,并等待这个任务的结果。任务被检查后就触发countDown,这样双写过程就知道结果了,与futurn差不多。

4.6 生产者消费者的管理分析

    对于服务端,持有并维护所有的客户端是非常常用的功能,看看rocketmq是怎么做的:
rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

    图片说明:
    1) brokerController还是核心中的核心,其它的模块都围绕着这个核心类引用着。
    2) 3个浅绿色的就是producer/consumer/filterserver Manager,除了filterserver有一个定时任务外,方法都是被深绿色的模块调用的。
    3) NettyRemotingServer被注入多个处理器processer,还有一个监听器clientHousekeepingService来监听channel的变化。当NettyRemotingServer接收外部的请求后,用相应的处理器处理,比如注册producer/conmsumer等,相应处理器会调用相应的manager的方法进行处理。注意都是通过brokerController调用的,不直接引用。当NettyRemotingServer中的channel本身的channel发生变化时,会通知监听器clientHousekeepingService,后者会让三个mananger发生相应的变化。
    4) consumerManager被监听器通知后,除了自己的动作外,它还会通知监听它的深黄色的consumerIdsChangerListener,而这个监听器又进一步引起另外两个浅黄色的对象发生变化。
    5) 即使是下级别的consumerIdsChangerListener也是被核心类brokerController引用,并在init时设置给别人用的。所以一切都以核心类为桥梁。

    设计思考:
    如果我来设计,三个manager是必须的,必然要管理连接的各种客户端。三个管理很可能每一个自己管理自己的channel的可用性。
    客户端的变化受通讯的消息的影响,也受意外的影响,比如通讯超时。所以两个原因造成管理的对象变化。所以消息处理会调用每个mananger,channel监听会调用manager。作者把这些统一到一个类housekeepingservice中,而我可能会多些类来处理。


4.7 消费者PULL拉消息的处理

    通常对消息的PULL拉取是客户端的主动请求,主动去查看有没有新的消息。
    推消息PUSH是监听回调,如果有消息到来,服务端主动把消息发到客户端。

    PULL一般就直接同步调用,但看到brokerController中有一个PullRequestHoldService,看名字是把请求都放在这里。看到在brokerAllowSuspend && hasSuspendFlag的配置下,如果没有直接找到消息ResponseCode.PULL_NOT_FOUND,这个请求要挂起。
    请求要按topic, queueId组合产生的key存在PullRequestHoldService的ConcurrentHashMap里面。PullRequestHoldService有循环检查这些req的请求offset与实际队列的offset,有消息的时候会通过核心类getPullMessageProcessor().executeRequestWhenWakeup,产生一个runnable对象作为发送查到的消息的任务,给一个线程池来执行。


五、与之前业务的对比
   
    之间做过一个业务,包含服务端与个数不定的客户端,通讯是公司内部开发的基于netty的消息通讯,有点类似,这里说说与基于spring的web应用对接。

    基本结构:

rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 

    说明:
    1) 核心类是一个单例,接收请求数据,进行拆分后分发到各个客户端进行处理。
    2) 内部有队列存请求业务,还线程池进行处理,还有监听连接情况的listener。同样也有维护客户端的类。
    3) 业务请求的持久化,最终数据的持久化等部分功能需要外部传入接口的实现类。

    4) 该模块作为Jar包嵌入在一个web容器中,通过http请求接收原始处理请求。除这个业务之外的外部类都由spring进行管理。
    5) 使用了一个component作为spring容器与这个核心类的桥梁,持有核心类同时也通过spring自动持有所有核心类要用的外部类。在component的afterpropetyset中进行核心类的初始化,并把所有外部要的类设置进去,最后启动核心类。
    6) web请求的业务数据,也通过这个component提交给核心类来处理。外部需要了解核心类的数据,也通过component获取。

    当时功能还不是非常多,总体上是通过核心类来处理业务的,但个别地方可能有直接引用造成不规范,看了rockmq的设计,可以参考进一步优化自己的组件。


六、总体感觉

    这是一个以功能为主的中间件,在类的关系上很清晰,没有太多的抽象类与接口,没有各种各样的实现。这个与dubbo有鲜明的对比。
    dubbo作为一个微核心,主功能并不多,不过都是抽象类与接口。整合了各种各样的实现方式,有静态的,有动态的实现,都缓存在extension中。就象一个主板兼容各种各样的CPU,MEM,硬盘,声卡,在已有的产品上做了很多适配器。但如果以常用的配置直接实现dubbo,相对应该是非常简化了。

    工程中的类关系的设计有时候类似于生活中的场景,比如一个公司的组织来处理内外业务,一个工厂厂房内如何加工物品。再想想很多算法,sql处理都与人工处理相似,再比如mapreduce算法与农场分散农民收获水果,并最后分类归并出来很类似。

    最近看到有人每一个action类对应一个validate类,而且还被模仿。说明开发人员没有很好的思考类之间的关系。比如各个销售部门进行合同审核,肯定统一找法务部门。validate肯定很很多被公共使用的东西,应该封装在一起。另外我之前常用医院业务举例,体会业务类,单例,各个科室排队等。现实生活中,由于社会化大生产分工细化,同时又组合成复杂的业务,这如同类的设计,有明确的责任,内聚,单一职责,又共同完成复杂的功能。所以设计项目时进行类比思考,非常有利于设计出良好的结构。

    平常的开发以实现功能为主,所以感觉rocketmq更有学习意义。而dubbo更锻炼抽象能力。
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 227.1 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 140.1 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 218 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 181.7 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 220.2 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 50.2 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 284.3 KB
  • rocketmq源码分析、整体结构与类关系设计的思考
            
    
    博客分类: rocketmq 阿里巴巴中间件jms框架 
  • 大小: 185.9 KB