RocketMq源码解读(一)
nameserver作为替换早期版本zookeeper的轻量级实现,它只实现了zk的一致性+发布订阅。NameSrv的一致性是通过每个Broker、Consumer、Producer定时心跳同步的,存在短暂的弱一致性。发布订阅时,Broker会与每一个NameSrv保持TCP连接,上传topic信息,自身的健康状态,filter信息等,Consumer和Producer也会与每一台NameSrv保持TCP连接,获取Topic的路由信息,包证负载均衡。本章主要介绍名字空间服务namesrv,
管理的数据结构归纳为
具体的源码实现如下:
一、kvconfig包
1. KVConfigSerializeWrapper
这个类的主要作用是继承自远程序列化的包装,对远程的json序列化字符串做序列化和反序列化。
public abstract class RemotingSerializable {
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
// 序列化成byte传输
public static byte[] encode(final Object obj);
public byte[] encode();
// 反序列化
public static <T> T decode(final byte[] data, Class<T> classOfT);
// json序列化
public static String toJson(final Object obj, boolean prettyFormat)
public static <T> T fromJson(String json, Class<T> classOfT);
}
2、KVConfigManager类
这个类的在内部维护了一个内存态的HashMap,存储的格式为
HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
基于此,对名字空间下的KV做操作处理。
public class KVConfigManager {
private final NamesrvController namesrvController;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();
// 获取默认名字空间下的所有KV,加载到内存中
public void load();
// 持久化到本地
public void persist();
public void putKVConfig(final String namespace, final String key, final String value);
public void deleteKVConfig(final String namespace, final String key);
public byte[] getKVListByNamespace(final String namespace);
public String getKVConfig(final String namespace, final String key);
}
二、processor包
1、DefaultRequestProcessor
这个类主要的功能是对netty传输的命令做解析,并根据不同的命令处理不同的业务。首先封装接口,处理Netty请求,所有的命令都封装成RemotingCommand。
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception;
boolean rejectRequest();
}
DefaultRequestProcessor实现了Command处理接口,对命令做解析处理。
public class DefaultRequestProcessor implements NettyRequestProcessor{
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch(request.getCode()){
// 存放KV
case RequestCode.PUT_KV_CONFIG:
case RequestCode.GET_KV_CONFIG:
case RequestCode.DELETE_KV_CONFIG:
// 注册broker
case RequestCode.REGISTER_BROKER:
case RequestCode.UNREGISTER_BROKER:
// 通过topic获取对应的路由信息
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
case RequestCode.GET_BROKER_CLUSTER_INFO:
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
// 获取名字服务上的topic list
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
case RequestCode.GET_TOPICS_BY_CLUSTER:
...
}
}
}
public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException{
// 解析远程请求的头部信息
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final PutKVConfigRequestHeader requestHeader =
(PutKVConfigRequestHeader)
request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
// 根据头部信息所带的名字空间和KV,存入到内存HashMap中
this.namesrvController.getKvConfigManager().putKVConfig(
requestHeader.getNamespace(),
requestHeader.getKey(),
requestHeader.getValue()
);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
对于注册broker,其主要的方法是:
private RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
// 初始化返回对象
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
// 解析头部信息
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
// 解析请求体
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}
// 调用路由管理器,注册broker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
// 返回结构体中,会带着master的信息和Ha的信息
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
三、routeinfo包
1、对于broker和topic的管理,在路由包下做具体的实现,继续看注册broker方法的实现:
public class RouteInfoManager{
// 借助读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 一个topic可以有多个broker
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 一个broker名字下,可以包含多个broker
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群下,有多个brokerName
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// broker的连接信息封装
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker地址和过滤列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// 注册broker的方法实现
public RegisterBrokerResult registerBroker(
final String clusterName, // 集群名字
final String brokerAddr, // broker地址
final String brokerName, // brokerName
final long brokerId, // 分配的id
final String haServerAddr, // 高可用地址
final TopicConfigSerializeWrapper topicConfigWrapper, // 相关的topic序列化信息
final List<String> filterServerList, // 需要过滤的服务列表
final Channel channel) // netty的连接channel
{
RegisterBrokerResult result = new RegisterBrokerResult();
try{
// 这里try是因为锁是可以被中断的
try{
this.lock.writeLock().lockInterruptibly();
// 获取集群下的brokerName信息,更新集群
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNamse.add(brokerName);
boolean registerFirst = false;
// 因为一个BrokerName,可以有多个broker
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData();
brokerData.setBrokerName(brokerName);
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerData.setBrokerAddrs(brokerAddrs);
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
// 判断broker是不是首次注册
registerFirst = registerFirst || (null == oldAddr);
// 如果是master,并且topic信息有变动
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId){
// 如果是首次注册或者topic信息有变动
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst){
// 要更新topic对应的broker信息
ConcurrentHashMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 如果topic信息没有变动,或者不是master,更新链接信息
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel, haServerAddr));
// 更新filtersrv信息
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果不是master的broker,获取到master对应的Ha信息和master地址信息,返回到结果
if (MixAll.MASTER_ID != brokerId){
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
}finally{this.lock.writeLock.unlock();}
}catch(Exception e){}
return result;
}
// 创建或者是更新topic对应的broker信息
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
}
综上,注册一个broker的过程解析为:
- 从远程请求中,解析出需要注册的broker信息,包括集群信息,broker地址,名字,id,Ha地址,topic信息
- 根据集群名字,更新集群中的brokerName信息(一个brokerName对应多个broker)
- 将broker信息加入到broker地址table中,更新地址列表信息
- 如果broker是master并且topic信息不为空,判断topic信息有变动或者是首次注册,需要更新topic对应的broker列表
- 更新broker的连接信息
- 如果broker不是Master,将Master地址和Ha地址更新到连接信息中
2、因为一个topic可能对应多个broker,如何根据topic选取路由呢?具体的实现看pickupTopicRouteData方法
public TopicRouteData pickupTopicRouteData(final String topic){
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<>();
List<BrokerData> brokerDataList = new LinkedList<>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<>();
topicRouteData.setFilterServerTable(filterServerMap);
try{
try{
this.lock.readLock.lockInterruptibly();
// 根据topic获取对应的broker连接信息
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if(queueDataList != null){
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData();
brokerDataClone.setBrokerName(brokerData.getBrokerName());
brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
}
}
}
所以,根据topic最终获取到的数据结构体为:
public class TopicRouteData extends RemotingSerializable{
/**
* 顺序消息配置。
* 格式为=》BrokerName1:QueueId1;BrokerName2:QueueId2;...BrokerNameN:QueueIdN
*/
private String orderTopicConf;
// 对应的broker上Queue列表信息
private List<QueueData> queueDatas;
// broker 地址信息
private List<BrokerData> brokerDatas;
// broker地址和对应的filter信息
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
四、NameSrvStartUp和NamSrvController
1、NameSrvStartUp是服务启动入口,主要是初始化NamSrvController服务,并且添加服务退出的钩子函数,进行资源的释放。
2、NamSrvController主要是加载初始化配置信息,启动Netty服务,监听端口(默认9876)。
public class NamesrvController{
public boolean initialize() {
// 加载初始化配置信息
this.kvConfigManager.load();
// 初始化netty服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 远程服务线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册服务处理器
this.registerProcessor();
// 10s扫描一次过期或者非活跃的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 10分钟打印一次配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer{
// 主要封装netty server的配置信息,
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener){
}
// 启动netty服务
public void start(){
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
// 每3秒检查一次服务请求是否超时,如果超时,进行连接释放和资源释放
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
}
上一篇: javascript的调试技巧
下一篇: 推荐几个不错的console调试技巧
推荐阅读
-
解读真正的赵氏孤儿,赵氏孤儿真的是“正义”的一方吗?
-
仿Aspnetpager的一个PHP分页类代码 附源码下载
-
Spring MVC源码(一) ----- 启动过程与组件初始化
-
网站优化实例教程 解读网站搜索引擎和快照的一些问题
-
Mybaits 源码解析 (十)----- 全网最详细,没有之一:Spring-Mybatis框架使用与源码解析
-
走进PC第一厂:联想解读智慧PC制造 推进国家创新
-
九、Spring之BeanFactory源码分析(一)
-
Mybaits 源码解析 (八)----- 全网最详细,没有之一:结果集 ResultSet 自动映射成实体类对象(上篇)
-
Mybaits 源码解析 (九)----- 全网最详细,没有之一:一级缓存和二级缓存源码分析
-
一个可分页的基于文本的PHP留言板源码第1/2页