欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMq源码解读(一)

程序员文章站 2022-07-14 23:25:31
...

nameserver作为替换早期版本zookeeper的轻量级实现,它只实现了zk的一致性+发布订阅。NameSrv的一致性是通过每个Broker、Consumer、Producer定时心跳同步的,存在短暂的弱一致性。发布订阅时,Broker会与每一个NameSrv保持TCP连接,上传topic信息,自身的健康状态,filter信息等,Consumer和Producer也会与每一台NameSrv保持TCP连接,获取Topic的路由信息,包证负载均衡。本章主要介绍名字空间服务namesrv,

管理的数据结构归纳为

RocketMq源码解读(一)

具体的源码实现如下:

一、kvconfig包

1. KVConfigSerializeWrapper

这个类的主要作用是继承自远程序列化的包装,对远程的json序列化字符串做序列化和反序列化。

public abstract class RemotingSerializable {
    private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
    // 序列化成byte传输
    public static byte[] encode(final Object obj);
    public byte[] encode();
    // 反序列化
    public static <T> T decode(final byte[] data, Class<T> classOfT);

    // json序列化
    public static String toJson(final Object obj, boolean prettyFormat)
    public static <T> T fromJson(String json, Class<T> classOfT);
}

2、KVConfigManager类

这个类的在内部维护了一个内存态的HashMap,存储的格式为

HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;

基于此,对名字空间下的KV做操作处理。

public class KVConfigManager {
    
    private final NamesrvController namesrvController;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();

    // 获取默认名字空间下的所有KV,加载到内存中
    public void load();
    // 持久化到本地
    public void persist();
    public void putKVConfig(final String namespace, final String key, final String value);
    public void deleteKVConfig(final String namespace, final String key);
    
    public byte[] getKVListByNamespace(final String namespace);

    public String getKVConfig(final String namespace, final String key);
}

二、processor包

1、DefaultRequestProcessor

这个类主要的功能是对netty传输的命令做解析,并根据不同的命令处理不同的业务。首先封装接口,处理Netty请求,所有的命令都封装成RemotingCommand。

public interface NettyRequestProcessor {
    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception;

    boolean rejectRequest();
}

DefaultRequestProcessor实现了Command处理接口,对命令做解析处理。

public class DefaultRequestProcessor implements NettyRequestProcessor{

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    switch(request.getCode()){
        // 存放KV
        case RequestCode.PUT_KV_CONFIG:
        case RequestCode.GET_KV_CONFIG:
        case RequestCode.DELETE_KV_CONFIG:
        // 注册broker
        case RequestCode.REGISTER_BROKER:
        case RequestCode.UNREGISTER_BROKER:
        // 通过topic获取对应的路由信息
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
        case RequestCode.GET_BROKER_CLUSTER_INFO:
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        // 获取名字服务上的topic list
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:

        case RequestCode.GET_TOPICS_BY_CLUSTER:
        ...
    }   
    }
}

public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException{
    // 解析远程请求的头部信息
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final PutKVConfigRequestHeader requestHeader =
            (PutKVConfigRequestHeader)    
             request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
    // 根据头部信息所带的名字空间和KV,存入到内存HashMap中
    this.namesrvController.getKvConfigManager().putKVConfig(
            requestHeader.getNamespace(),
            requestHeader.getKey(),
            requestHeader.getValue()
        );
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

对于注册broker,其主要的方法是:

private RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    // 初始化返回对象
    final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
    final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
    // 解析头部信息
    final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    // 解析请求体
    TopicConfigSerializeWrapper topicConfigWrapper;
    if (request.getBody() != null) {
        topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
    } else {
        topicConfigWrapper = new TopicConfigSerializeWrapper();
        topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
        topicConfigWrapper.getDataVersion().setTimestamp(0);
    }
    // 调用路由管理器,注册broker
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            topicConfigWrapper,
            null,
            ctx.channel()
    );
    // 返回结构体中,会带着master的信息和Ha的信息
    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

三、routeinfo包

1、对于broker和topic的管理,在路由包下做具体的实现,继续看注册broker方法的实现:

public class RouteInfoManager{
    // 借助读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // 一个topic可以有多个broker
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // 一个broker名字下,可以包含多个broker
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // 集群下,有多个brokerName
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // broker的连接信息封装
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // broker地址和过滤列表
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

// 注册broker的方法实现
    public RegisterBrokerResult registerBroker(
        final String clusterName,    // 集群名字
        final String brokerAddr,    // broker地址
        final String brokerName,    // brokerName
        final long brokerId,        // 分配的id
        final String haServerAddr,    // 高可用地址
        final TopicConfigSerializeWrapper topicConfigWrapper,    // 相关的topic序列化信息
        final List<String> filterServerList,        // 需要过滤的服务列表
        final Channel channel)            // netty的连接channel
    {
    
        RegisterBrokerResult result = new RegisterBrokerResult();
        try{
            // 这里try是因为锁是可以被中断的
            try{
                this.lock.writeLock().lockInterruptibly(); 
                // 获取集群下的brokerName信息,更新集群
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNamse.add(brokerName);
                boolean registerFirst = false;
                // 因为一个BrokerName,可以有多个broker
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData();
                    brokerData.setBrokerName(brokerName);
                    HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
                    brokerData.setBrokerAddrs(brokerAddrs);

                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                // 判断broker是不是首次注册
                registerFirst = registerFirst || (null == oldAddr);
                // 如果是master,并且topic信息有变动
                if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId){
                    // 如果是首次注册或者topic信息有变动
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst){
                        // 要更新topic对应的broker信息
                        ConcurrentHashMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                // 如果topic信息没有变动,或者不是master,更新链接信息
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel, haServerAddr));
                // 更新filtersrv信息
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
               // 如果不是master的broker,获取到master对应的Ha信息和master地址信息,返回到结果
               if (MixAll.MASTER_ID != brokerId){
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
               }
                
            }finally{this.lock.writeLock.unlock();}
        }catch(Exception e){}
        return result;
    }

    // 创建或者是更新topic对应的broker信息
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName);
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
        queueData.setPerm(topicConfig.getPerm());
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
        } else {
            boolean addNewOne = true;

            Iterator<QueueData> it = queueDataList.iterator();
            while (it.hasNext()) {
                QueueData qd = it.next();
                if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                            queueData);
                        it.remove();
                    }
                }
            }

            if (addNewOne) {
                queueDataList.add(queueData);
            }
        }
    }

}

综上,注册一个broker的过程解析为:

  • 从远程请求中,解析出需要注册的broker信息,包括集群信息,broker地址,名字,id,Ha地址,topic信息
  • 根据集群名字,更新集群中的brokerName信息(一个brokerName对应多个broker)
  • 将broker信息加入到broker地址table中,更新地址列表信息
  • 如果broker是master并且topic信息不为空,判断topic信息有变动或者是首次注册,需要更新topic对应的broker列表
  • 更新broker的连接信息
  • 如果broker不是Master,将Master地址和Ha地址更新到连接信息中

2、因为一个topic可能对应多个broker,如何根据topic选取路由呢?具体的实现看pickupTopicRouteData方法

public TopicRouteData pickupTopicRouteData(final String topic){
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<>();
    List<BrokerData> brokerDataList = new LinkedList<>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<>();
    topicRouteData.setFilterServerTable(filterServerMap);

    try{
        try{
            this.lock.readLock.lockInterruptibly();
            // 根据topic获取对应的broker连接信息
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if(queueDataList != null){
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }
            for (String brokerName : brokerNameSet) {
                 BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                 if (null != brokerData) {
                     BrokerData brokerDataClone = new BrokerData();
                     brokerDataClone.setBrokerName(brokerData.getBrokerName());
                     brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
                     brokerDataList.add(brokerDataClone);
                     foundBrokerData = true;
                     for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                         List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                         filterServerMap.put(brokerAddr, filterServerList);
                      }
                  }
             }
            }
        }
    }
}

所以,根据topic最终获取到的数据结构体为:

public class TopicRouteData extends RemotingSerializable{
    /**
     * 顺序消息配置。
     * 格式为=》BrokerName1:QueueId1;BrokerName2:QueueId2;...BrokerNameN:QueueIdN
     */
    private String orderTopicConf;
    // 对应的broker上Queue列表信息
    private List<QueueData> queueDatas;
    // broker 地址信息
    private List<BrokerData> brokerDatas; 
    // broker地址和对应的filter信息
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;    
}

四、NameSrvStartUp和NamSrvController

1、NameSrvStartUp是服务启动入口,主要是初始化NamSrvController服务,并且添加服务退出的钩子函数,进行资源的释放。

2、NamSrvController主要是加载初始化配置信息,启动Netty服务,监听端口(默认9876)。

public class NamesrvController{
    public boolean initialize() {
    // 加载初始化配置信息
    this.kvConfigManager.load();
    // 初始化netty服务端
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // 远程服务线程池
    this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    // 注册服务处理器
    this.registerProcessor();

    // 10s扫描一次过期或者非活跃的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
    
    // 10分钟打印一次配置信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
    return true;
}
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer{
    // 主要封装netty server的配置信息,
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener){
    }
    
    // 启动netty服务
    public void start(){
         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            new NettyConnetManageHandler(),
                            new NettyServerHandler());
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
        
        if (this.channelEventListener != null) {
            this.nettyEventExecuter.start();
        }

        // 每3秒检查一次服务请求是否超时,如果超时,进行连接释放和资源释放
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Exception e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
        
    }
}