RocketMQ源码分析:Namesrv
一、下载源码
从github拉取rocketmq代码:https://github.com/apache/rocketmq.git,当前最新的版本是4.5.2.我用的是ideal,可以看到rocketmq源码结构如下:
rocketmq共包含9个模块,解释一下各个模块的功能和作用:
- rocketmq-common:通用的枚举、基类方法、或者数据结构,包名有admin、consumer、filter、hook、message
- rocketmq-remoting:使用netty的客户端、服务端,使用fastjson序列化,自定义二进制协议
- rocketmq-srvutil:只有一个ServerUtil类,只提供Server程序依赖,尽可能减少客户端依赖
- rocketmq-store:消息存储,索引,consumerLog,commitLog等
- rocketmq-client:消息发送和接收,包含consumer和producer
- rocketmq-filtersrv:消息过滤器
- rocketmq-broker:服务端,接受消息,存储消息,consumer拉取消息
- rocketmq-tools:命令行工具
- rocketmq-namesrv:NameServer,类似服务注册中心,broker在这里注册,consumer和producer在这里找到broker地址
可以看到rocketmq网络通讯是建立在netty上的,netty是一款高性能的异步非阻塞的网络通讯框架,很多开源的中间件(dubbo/rocketmq)网络通讯层都是基于netty开发的.
二、NameSrv源码详解
首先,看一下rocketmq-namesrv模块,代码量不多,一共8个类,从面子上可以看出启动入口:NamesrvStartup
核心功能
NameServer相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与队列信息等。Broker向它注册路由信息,同时Client向其获取路由信息。NameServer本身是没有状态的,NameServer彼此之间不通信,每个Broker与集群内所有的Nameserver保持长连接。如果使用过Zookeeper,就比较容易理解了,但是功能比Zookeeper弱。
NameServer定时任务每10s扫描一下失效的Broker,超过2分钟没有收到Broker的心跳包,则关闭连接 。
- NamesrvStartup:NamesrvStartup是一个启动类,主要逻辑都在处理命令行和配置
- NamesrvController:NameserController 是 NameServer 模块的核心控制类
- NamesrvConfig:主要指定 nameserver 的相关配置属性:
kvConfigPath(kvConfig.json)。
mqhome/namesrv/namesrv.properties。
orderMessageEnable,是否开启顺序消息功能,默认为false。 - KVConfigManager:读取或变更NameServer的配置属性,加载 NamesrvConfig 中配置的配置文件到内存,此类一个亮点就是使用轻量级的非线程安全容器,再结合读写锁对资源读写进行保护。尽最大程度提高线程的并发度。
- RouteInfoManager :NameServer 数据的载体,记录 Broker、Topic 等信息。
- BrokerHousekeepingService:实现 ChannelEventListener接口,可以说是通道在发送异常时的回调方法
NamesrvStartup启动器
NamesrvStartup是一个启动类,主要逻辑都在处理命令行和配置,主要功能都是在NamesrvController中;
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
//初始化NamesrvController
NamesrvController controller = createNamesrvController(args);
// 启动NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
createNamesrvController:创建NamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 根据命令行参数,使用commons-cli命令行工具包解析生成CommandLine对象
// 在parseCmdLine中,如果命令行中有-h选项,执行打印帮助文档的逻辑,然后退出,不再继续
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 初始化了2个配置 NamesrvConfig,NettyServerConfig,其中NettyServerConfig监听9876是硬编码的
// 然后通过命令行参数 -c 指定一个配置文件,然后将配置文件中的内容解析成NamesrvConfig,NettyServerConfig的配置
// 设置NamesrvConfig,NettyServerConfig的逻辑是看类中的set方法,如果set方法后的名字和配置文件中的key匹配,就会设置对应的值
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//如果指定了 -p 选项,会在控制台打印配置信息,然后退出,不再继续执行
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//将启动命令行的参数配置设置到NamesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 检查必须设置RocketMQHome
// 在NamesrvConfig中,可以看到使用系统属性rocketmq.home.dir,环境变量ROCKETMQ_HOME和前面的-c指定的配置文件设置RocketMQHome
// 在mqnamesrv启动脚本中会自定探测RockerMQ并export ROCKETMQ_HOME
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 使用logback打印NamesrvConfig,NettyServerConfig配置信息
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 最后还把-c指定的文件的配置在保存到Configruation中
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
NamesrvController初始化完成后,就调用start(controller),才真正的开始:
// 启动NamesrvController
start(controller);
启动NamesrvController,start(controller)方法中有两个核心的方法:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//NamesrvController初始化
boolean initResult = controller.initialize();
//启动NamesrvController
controller.start();
return controller;
}
NamesrvController初始化:
public boolean initialize() {
// 从NamesrvConfig#KvConfigPath指定的文件中反序列化数据到KVConfigManager#configTable中
this.kvConfigManager.load();
// 启动网络通信的Netty服务
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化一下负责处理Netty网络交互数据的线程池,
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册一个默认负责处理Netty网络交互数据的DefaultRequestProcessor,这个Processor会使用remotingExecutor执行
// *划重点,后面这里会再次提到*
this.registerProcessor();
// 每10s扫描一下失效的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 每10min打印一下前面被反复蹂躏的配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
启动NamesrvController:
public void start() throws Exception {
//启动netty
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
看看NettyRemotingServer的start()方法的具体实现:
public void start() {
// 初始化一个线程池,用于执行共享的Handler
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 这里有一个channelEventListener,在NamesrvController中channelEventListener就是BrokerHousekeepingService,BrokerHousekeepingService负责在broker断开连接的时候,移除RouteInfoManager中的路由信息
// NettyEventExecutor会维护一个NettyEvent的队列,NettyConnectManageHandler会向NettyEvent的队列中添加Event,然后由channelEventListener进行消费
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 定时扫描responseTable,执行超时请求的callback
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
整个过程大概如下: