欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

elasticsearch discovery模块启动通信过程

程序员文章站 2022-07-14 17:59:51
...

elasticsearch是由Elasticsearch类的main方法启动的,首先会读取系统环境变量和虚拟机参数进行配置,参数环境配置后由Bootstrap类init方法负责主要的启动过程。查看的是v5.3.1的es源码
(1)、通过Enviroment的环境构建Node节点资源,加载插件,设置类的注入及绑定关系,有了Node实例,调用start方法后Node生命周期开始启动,通过injector.getInstance()方法是从容器中拿到相应服务的实例,启动或设置相关的参数、关联关系后启动,然后每个服务都做好本职工作,运转起来。

(2)、DiscoveryModule加载用于节点发现的相关类,节点发现的类型包括zen, none,zen对应着ZenDiscovery实现类,none对应NoneDiscovery,默认是用ZenDiscovery用于节点发现,ZenDiscovery利用UnicastZenPing将ping请求发送到各节点,不再有默认的MulticastZenPing实现,而是多了几个云平台节点发现的机制(aws, gce等,以插件形式提供),如下图所示:
elasticsearch discovery模块启动通信过程
ZenDiscovery 是节点发现的主类,结构图如下:
elasticsearch discovery模块启动通信过程
ZenDiscovery主要依赖以下几个类实现节点发现功能:
1)MembershipAction:负责节点的加入集群处理和离开集群处理
2)UnicastZenPing:ZenPing的单播实现,负责ping请求的发送和数据接收(es中ping命令和其它命令一样是集群内部通信的一种形式,是es自己实现的应用层协议,每个命令都有对应的actino_name,比如UnicastZenPing的ping命令对应的协议为:internal:discovery/zen/unicast,与windows中的ping命令无关联)
3)PublishClusterStateAction:主节点通过PublishClusterStateAction将新的集群状态发送给其他从节点,从节点通过PublishClusterStateAction实现新的集群状态的更新
4)MasterFaultDetection:主节点和从节点都会运行,从节点会周期性发送ping请求(action name: internal:discovery/zen/fd/master_ping)监测主节点
5)NodesFaultDetection:只运行在主节点上,主节点会周期性发送ping请求监测所有从节点(action name: internal:discovery/zen/fd/ping)
6)ClusterService:集群服务主类,负责维护集群的状态和执行相应Task任务,通过ClusterService可以获取当前集群状态ClusterState,管理ClusterStateApplier、ClusterStateListener,当集群状态发生改变时提交StateUpdateTask利用ClusterStateApplier 更新状态,根据处理的结果调用相应的ClusterStateListener方法
7)JoinThreadControl和NodeJoinController:节点发现、加入请求的控制类,JoinThreadControl的startNewThreadIfNotRunning方法是加入集群、选举行为启动的主要方法,会被多处调用,而NodeJoinController控制着选举是否成功、节点加入集群请求等控制处理,其中waitToBeElectedAsMaster方法等待足够多的最小候选主节点(discovery.zen.minimum_master_nodes配置参数)加入集群后,选举才是成功,否则失败后需重新选举,handleJoinRequest方法处理节点加入请求,如果在选举期间请求会被堆积到队列里,等待选举成功后处理
8)TransportService:内部通信主类,负责请求的发送,响应的处理,registerRequestHandler方法注册TransportRequestHandler请求的响应处理类,通过sendRequest方法将特定的命令发送到特型的节点
9)ThreadPool:每个节点主要的线程池,分为不同的种类,通过字符串来进行标识(same、generic、get、index、search),分别适用于不同的场景
10)ElectMasterService:选主服务的主类,minimumMasterNodes的维护,选举的排序规则,以及选举出的主节点都由此类负责
主要相关配置参数说明(elasticsearch.yml文件中配置),下面会用到:
node.master:是否可做为主节点,默认为true
node.data:是否可作为数据节点,默认为true
discovery.zen.ping_timeout:ping请求超时时间,默认3秒
discovery.zen.join_timeout:加入集群的超时时间,默认ping请求超时时间的20倍
discovery.zen.join_retry_attempts:加入集群重试次数,默认3次
discovery.zen.minimum_master_nodes:组成一个集群所需要的最小候选主节点个数,默认为1
discovery.zen.master_election.wait_for_joins_timeout:选举出主节点后等待最小候选主节点加入集群的超时时间,默认加入集群的超时时间的一半
discovery.zen.publish_diff.enable:是否启用集群状态增量更新,默认为true
(3)、Node节点启动过程中会从Guice容器中得到Discovery接口的实例ZenDiscovery,设置AllocationService,AllocationService负责管理集群中节点资源的分配,管理AllocationDeciders以决定哪些节点适合用于切片的分配,还负责节点的加入管理和切片的重新分配。
(4)、DiscoveryNode代表集群中的节点资源,包括主节点和从节点,其中isMasterNode方法用于判断是否是候选主节点(node.master配置为true),isDataNode(node.data配置为true )判断是否可用于存储数据,DiscoveryNodes保存集群中所有的DiscoveryNode资源,其中masterNodeId属性保存主节点的id信息,localNodeId属性保存本节点的id信息,DiscoveryNode对象是从配置的连接字符串构建而来
(5)、elasticserach中很多资源都继承自AbstractLifecycleComponent抽象类,代表具有生命周期属性的集群资源,ZenDiscovery也属于此类,可以启动,关闭,状态转移等,ZenDiscovery服务的启动主要包括discovery.start()和discovery.startInitialJoin()两个方法,其他服务启动起来之后Node即启动完成,并运行一个保活线程(通过CountDownLatch的wait()保持阻塞)
(6)、随后通过ClusterStateObserver监测集群状态的变化,节点变化的条件是DiscoveryNodes 对象中masterNodeId不为null,即有主节点诞生,即(5)中discovery的启动后已经选举出了主节点信息
(7)、下面主要是(5)中节点启动通信并选举的过程,主要涉及Discovery服务两个方法,即discovery.start() 和discovery.startInitialJoin() :
先对集群状态的更新,即clusterService.submitStateUpdateTask方法的执行做一下说明(以主节点的视角): 节点加入集群由JoinThreadControl类发起,具体任务的执行及相应的操作即clusterService.submitStateUpdateTask方法的调用都会提交给ThreadPool线程池处理,任务会被封装成UpdateTask,只有主节点才能发布集群状态更新任务,从节点通过PublishClusterStateAction类中的 SendClusterStateRequestHandler和CommitClusterStateRequestHandler来处理主节点下发的任务和需要提交的任务。UpdateTask本身是一个支持优先级的Runnable类,包含了需要执行的任务、任务执行的回调信息ClusterStateTaskListener(任务执行失败回调其onFailure方法,主节点退位了回调onNoLongerMaster方法,状态被正常处理了回调clusterStateProcessed方法)及ClusterStateTaskExecutor执行器。执行任务的时候是取得ClusterStateTaskExecutor去执行它对应的任务,在这里是JoinTaskExecutor类的execute方法被调用,输入数据被封装到TaskInputs类中,TaskInputs包含之前集群的状态ClusterState和任务列表(List节点列表数据),更新主要涉及排除冲突的节点(要加入集群的节点已存在,则删除已存在的节点,接受新节点),将从节点们都加入DiscoveryNodes中,生成新的ClusterState和每个任务执行结果,得到TaskOutputs,随后会构建一个ClusterChangedEvent集群状态改变事件,事件中包含了之前集群的状态、更新后集群的状态和增量信息(本节点nodeId,前任主节点,现任主节点,新加的节点,删除的节点),通过NodeConnectionsService与从节点们建立连接,将集群信息发布出去,在这里是PublishClusterStateAction类中的publish方法被调用,默认发布增量更新,出错时重发全量更新,transportService.sendRequest负责数据传输,状态发布的action name为internal:discovery/zen/publish/send,从节点收到数据后存到PendingClusterStatesQueue队列中并发送Empty响应,每有一个从节点成功响应了,就会累加计数器,并且把响应的节点暂存起来,直到有最小候选主节点成功响应后,对这些已经响应了的和还没响应的从节点下发提交任务,提交任务的action name为internal:discovery/zen/publish/commit,随后主节点更新自己的ClusterState对象,从节点收到提交任务后处理PendingClusterStatesQueue队列中的数据,在这里是ZenDiscovery类中内部类NewPendingClusterStateListener的onNewClusterState方法被调用,才开始处理等待中的任务,更新ClusterState集群状态,在这里从节点还会启动MasterFaultDetection服务,开始对主节点进行周期性监测。集群状态更新完毕。
discovery.start()主要初始化NodeJoinController(选举控制),NodeRemovalClusterStateTaskExecutor(节点删除),NodesFaultDetection(从节点监测),UnicastZenPing(ZenPing的默认实现,发送和处理ping请求 )等相关类信息
discovery.startInitialJoin()开始真正节点通信的过程,通过ClusterService提交StateUpdateTask状态更新任务,任务需要执行的操作主要是JoinThreadControl的startNewThreadIfNotRunning()方法的调用,该方法触发加入集群及后续选举操作,在失败出错等情况下会被再次调用,重新开始
首先要找到主节点,通过配置的连接主机ip和端口信息得到集群中的所有节点信息,封装到PingingRound类中并发送ping请求到其他节点,响应数据会被添加至PingCollection类中,PingCollection 包含每个节点的响应信息,共发送3次ping请求,第一次立即执行,第二次是ping超时的1/3的时间后,第三次是ping超时的2/3的时间后,这样就完成了ping请求
PingCollection 中的每个PingResponse都包含了节点自己的信息和它认为的主节点信息。遍历PingResponse ,得到主节点集合用C1标记(PingResponse 中的master属性值)和候选主节点集合用C2标记 (PingResponse 中的node属性值,isMasterNode 属性为true的节点即为候选主节点 )
如果C1不为空,排序后选择第一个节点作为主节点,排序规则是:isMasterNode=true的节点靠前,isMasterNode=false的节点靠后,再根据节点nodeId递增排序;如果C1为空,则要查看C2是否有最小主节点的个数,即组成一个集群所需要的最小可作为主节点的个数(满足isMasterNode=true的DiscoveryNode ),满足的话先按照节点的clusterStateVersion递减排序,再根据上述规则排序,选择第一个节点作为主节点,这个过程会一会持续到找到主节点为止。多个节点选举,同一套选举规则,所以每个节点的选举结果也是一致的。
如果自己被选举为了主节点,这时候还不能真正成为主节点,还需等待足够多的候选主节点发送加入请求,其他从节点的加入请求会被暂存到队列中,直到队列大小满足discovery.zen.minimum_master_nodes配置的个数要求(主节点自己算一个),满足后就会关闭选举这一上下文环境,即选举结束,正式成为主节点(其实elasticsearch是基于对等架构的,主节点并不是比从节点更为重要,但是需要一个主节点管理集群的状态,在集群拓扑发生改变时做出反应),下面要给从节点们加入集群的请求发送响应,另外执行BECOME_MASTER_TASK和FINISH_ELECTION_TASK(即成为主节点,选举结束了),通过ClusterService提交StateUpdateTask状态更新任务
如果没有被选为主节点,则与主节点建立连接,发送join请求,action name为internal:discovery/zen/join,待主节点响应后加入集群, 如果出错或主节点响应的信息不对则重新开始,ZenDiscovery类startNewThreadIfNotRunning方法会被执行
(8)主从相互监测说明:
ZenDiscovery通过NodesFaultDetection类监测节点状态并处理,同样主节点的监测由MasterFaultDetection负责,NodesFaultDetection和MasterFaultDetection会维护一个或多个Listener,当ping请求失败时做出相应的处理。
主节点通过NodesFaultDetection检测其他节点的状态,ping超时并重试后(discovery.zen.fd.ping_retries配置,默认3次)还没反应则主节点提交删除该节点任务

private void removeNode(final DiscoveryNode node, final String source, final String reason) {
        clusterService.submitStateUpdateTask(
                source + "(" + node + "), reason(" + reason + ")",
                new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
                ClusterStateTaskConfig.build(Priority.IMMEDIATE),
                nodeRemovalExecutor,
                nodeRemovalExecutor);
    }

从节点通过MasterFaultDetection检测主节点的状态,ping超时并重试后(discovery.zen.fd.ping_retries配置,默认3次) 还没反应则通过ClusterService提交主节点已宕机任务,调用JoinThreadControl.startNewThreadIfNotRunning()重新开始选举

private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a master failure
            return;
        }
        if (localNodeMaster()) {
            // we might get this on both a master telling us shutting down, and then the disconnect failure
            return;
        }
        logger.info((Supplier<?>) () -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);
        clusterService.submitStateUpdateTask("master_failed (" + masterNode + ")", newLocalClusterUpdateTask(Priority.IMMEDIATE) {
            @Override
            public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
                if (!masterNode.equals(currentState.nodes().getMasterNode())) {
                    // master got switched on us, no need to send anything
                    return unchanged();
                }
                // flush any pending cluster states from old master, so it will not be set as master again
                publishClusterState.pendingStatesQueue().failAllStatesAndClear(newElasticsearchException("master left [{}]", reason));
                return rejoin(currentState, "master left (reason = " + reason + ")");
            }
            @Override
            public void onFailure(String source, Exception e) {
                logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
            }
        });
    }
protected ClusterStateTaskExecutor.ClusterTasksResult<LocalClusterUpdateTask> rejoin(ClusterState clusterState, String reason) {
        // *** called from within an cluster state update task *** //
        assert Thread.currentThread().getName().contains(ClusterService.UPDATE_THREAD_NAME);
        logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
        nodesFD.stop();
        masterFD.stop(reason);
        // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
        // before a decision is made.
        joinThreadControl.startNewThreadIfNotRunning();
        return LocalClusterUpdateTask.noMaster();
    }

脑裂问题及解决方法可以参考:https://segmentfault.com/a/1190000004504225