欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

深入浅出Zookeeper之五 Leader选举

程序员文章站 2022-05-07 15:41:39
...

前面几篇文章简单介绍了zookeeper的单机server client处理。接下来几篇文章会介绍分布式部署下zookeeper的实现原理。我们假设有3台server的集群,zoo.cfg配置如下

tickTime=2000
dataDir=/home/admin/zk-data
clientPort=2181
#Learner初始化连接到Leader的超时时间
initLimit=10
#Learner和Leader之间消息发送,响应的超时时间
syncLimit=5
#集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

 在server.1的$dataDir下

echo '1'>myid

启动server.1

./zkServer.sh start

 分析之前先看看选举相关的类图

深入浅出Zookeeper之五 Leader选举

 

入口函数QuorumPeerMain主线程启动

 

public void runFromConfig(QuorumPeerConfig config) throws IOException {
      ......
  
      LOG.info("Starting quorum peer");
      try {
		//对client提供读写的server,一般是2181端口
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());
  		//zk的逻辑主线程,负责选举,投票等
          quorumPeer = new QuorumPeer();
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      new File(config.getDataLogDir()),
                      new File(config.getDataDir())));
		//集群机器地址
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setElectionType(config.getElectionAlg());
		//本机的集群编号
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
		//投票决定方式,默认超过半数就通过
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
  		//启动主线程
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }
 QuorumPeer复写Thread.start方法,启动

 

 

    @Override
    public synchronized void start() {
	//恢复DB,从zxid中回复epoch变量,代表投票轮数
        loadDataBase();
	//启动针对client的IO线程
        cnxnFactory.start();
	//选举初始化,主要是从配置获取选举类型        
        startLeaderElection();
	//启动
        super.start();
    }
 loadDataBase过程,恢复epoch数

 

 

private void loadDataBase() {
		try {
		//从本地文件恢复db
            zkDb.loadDataBase();

            // load the epochs
		//从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
    		long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
            	currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
            	// pick a reasonable epoch number
            	// this should only happen once when moving to a
            	// new code version
            	currentEpoch = epochOfZxid;
            	LOG.info(CURRENT_EPOCH_FILENAME
            	        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
            	        currentEpoch);
            	writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
            }
            if (epochOfZxid > currentEpoch) {
            	throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
            }
 .......
	}
 选举初始化

 

 

synchronized public void startLeaderElection() {
    	try {
		//先投自己
    		currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    	} catch(IOException e) {
    		RuntimeException re = new RuntimeException(e.getMessage());
    		re.setStackTrace(e.getStackTrace());
    		throw re;
    	}
	//从配置中拿自己的选举地址
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        ......
	//根据配置,获取选举算法
        this.electionAlg = createElectionAlgorithm(electionType);
    }
 获取选举算法,默认为FastLeaderElection算法

 

 

protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
                
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
		//leader选举IO负责类
            qcm = new QuorumCnxManager(this);
            QuorumCnxManager.Listener listener = qcm.listener;
            	//启动已绑定3888端口的选举线程,等待集群其他机器连接
	    if(listener != null){
                listener.start();
		//基于TCP的选举算法
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }
 FastLeaderElection初始化

 

 

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;
	//业务层发送队列,业务对象ToSend
        sendqueue = new LinkedBlockingQueue<ToSend>();
	//业务层接受队列,业务对象Notificataion
        recvqueue = new LinkedBlockingQueue<Notification>();
	//
        this.messenger = new Messenger(manager);
    }
Messenger(QuorumCnxManager manager) {
		//启动业务层发送线程,将消息发给IO负责类QuorumCnxManager
            this.ws = new WorkerSender(manager);

            Thread t = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
		//启动业务层接受线程,从IO负责类QuorumCnxManager接受消息
            this.wr = new WorkerReceiver(manager);

            t = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
        }
 QuorumPeer线程启动

 

 

run(){
.......
try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
		//如果状态是LOOKING,则进入选举流程
                case LOOKING:
                    LOG.info("LOOKING");

                    ......
                        try {
				//选举算法开始选举,主线程可能在这里耗比较长时间
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
		//其他流程处理		
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );                        
                    } finally {
                        observer.shutdown();
                        setObserver(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                }
.......
}
 进入选举流程

 

 

public Vote lookForLeader() throws InterruptedException {
......
        try {
		//收到的投票
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
		
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock++;
		//先投给自己
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
		//发送投票,包括发给自己
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */
		//主循环,直到选出leader
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
		//从IO线程里拿到投票消息,自己的投票也在这里处理
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
		//如果空闲
                if(n == null){
			//消息发完了,继续发送,一直到选出leader为止
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
			//消息还在,可能其他server还没启动,尝试连接
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
			//延长超时时间
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
		//收到了投票消息
                else if(self.getVotingView().containsKey(n.sid)) {
                    /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view.
                     */
                    switch (n.state) {
			//LOOKING消息,则
                    case LOOKING:
			......
			//检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
				//胜出了,就把自己的投票修改为对方的,然后广播消息
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        ......
			//添加到本机投票集合,用来做选举终结判断
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
			
			//选举是否结束,默认算法是超过半数server同意
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock, proposedEpoch))) {

                            ......
				//修改状态,LEADING or FOLLOWING
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
				//返回最终的选票结果
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
			//如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时
			//OBSERVING机器不参数选举
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
			//这2种需要参与选举
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock){
				//同样需要加入到本机的投票集合
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
				//投票是否结束,如果结束,再确认LEADER是否有效
				//如果结束,修改自己的状态并返回投票结果
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /**
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.leader, n.zxid,
                                n.electionEpoch, n.peerEpoch, n.state));
                        ......
                        break;
                    default:
 选举消息发送

 

 

    private void sendNotifications() {
	//循环发送
        for (QuorumServer server : self.getVotingView().values()) {
            long sid = server.id;
		//消息实体
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock,
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch);
......
		//添加到业务的发送队列,该队列会被WorkerSender消费
            sendqueue.offer(notmsg);
        }
    }
 WorkerSender消费

 

 

            public void run() {
                while (!stop) {
                    try {
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if(m == null) continue;

                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

            private void process(ToSend m) {
		//选票协议是固定的
                byte requestBytes[] = new byte[36];
                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

                /*
                 * Building notification packet to send
                 */

                requestBuffer.clear();
                requestBuffer.putInt(m.state.ordinal());
                requestBuffer.putLong(m.leader);
                requestBuffer.putLong(m.zxid);
                requestBuffer.putLong(m.electionEpoch);
                requestBuffer.putLong(m.peerEpoch);
		//通过QuorumCnxManager这个IO负责类发送消息
                manager.toSend(m.sid, requestBuffer);

            }
 QuorumCnxManager具体发送

 

 

    public void toSend(Long sid, ByteBuffer b) {
        /*
         * If sending message to myself, then simply enqueue it (loopback).
         */
	//如果是自己,不走网络,直接添加到本地接受队列
        if (self.getId() == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
            /*
             * Otherwise send to the corresponding thread to send.
             */
	//否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程
        } else {
             /*
              * Start a new connection if doesn't have one already.
              */
             if (!queueSendMap.containsKey(sid)) {
                 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                         SEND_CAPACITY);
                 queueSendMap.put(sid, bq);
                 addToSendQueue(bq, b);

             } else {
                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                 if(bq != null){
                     addToSendQueue(bq, b);
                 } else {
                     LOG.error("No queue for server " + sid);
                 }
             }
             connectOne(sid);
                
        }
    }
 尝试连接过程

 

 

synchronized void connectOne(long sid){
        if (senderWorkerMap.get(sid) == null){
		.......
		//对方的选举地址,3888端口
                electionAddr = self.quorumPeers.get(sid).electionAddr;
            .......
		//同步IO连接
                Socket sock = new Socket();
                setSockOpts(sock);
                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connected to server " + sid);
                }
		//连上了,初始化IO线程
                initiateConnection(sock, sid);
            ......
    }
 由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。

 

这个时候被连接的server的Listener选举线程会收到新连接

Listener主循环,接受连接

 

while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                        receiveConnection(client);
                        numRetries = 0;
                    }
 新连接处理

 

 

 public boolean receiveConnection(Socket sock) {
        Long sid = null;
        
        try {
            // Read server id
		//读server id
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            ......
        
        //If wins the challenge, then close the new connection.
	//如果对方id比我小,则关闭连接,只允许大id的server连接小id的server
        if (sid < self.getId()) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: " + sid);
            closeSocket(sock);
            connectOne(sid);

            // Otherwise start worker threads to receive data.
        } 
	//如果对方id比我大,允许连接,并初始化单独的IO线程
	else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return true;    
        }
        return false;
    }
 连上后,自己server的IO线程初始化

 

 

public boolean initiateConnection(Socket sock, Long sid) {
        DataOutputStream dout = null;
        try {
            // Sending id and challenge
		//先发一个server id
            dout = new DataOutputStream(sock.getOutputStream());
            dout.writeLong(self.getId());
            dout.flush();
        } 
        ......
        // If lost the challenge, then drop the new connection
	//如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费
        if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                     "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } 
	//如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程
	else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return true;    
            
        }
        return false;
    }
 通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。

 

IO发送线程SendWorker启动,开始发送选举消息

 

try {
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
			//每个server一个发送队列
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                .get(sid);
                        if (bq != null) {
				//拿消息
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }

                        if(b != null){
				//发消息
                            lastMessageSent.put(sid, b);
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Exception when using channel: for id " + sid + " my id = " + 
                        self.getId() + " error = " + e);
            }
            this.finish();
......
 这个时候,其他机器通过IO线程RecvWorker收到消息

 

 

 public void run() {
            threadCnt.incrementAndGet();
            try {
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
			//包的长度
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
			//读到内存
                    byte[] msgArray = new byte[length];
                    din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
			//添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息
                    addToRecvQueue(new Message(message.duplicate(), sid));
                }
          ......
        }
 业务层的接受线程WorkerReceiver拿消息

 

 

 public void run() {

                Message response;
                while (!stop) {
                    // Sleeps on receive
                    try{
			//从IO线程拿数据
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if(response == null) continue;

                        /*
                         * If it is from an observer, respond right away.
                         * Note that the following predicate assumes that
                         * if a server is not a follower, then it must be
                         * an observer. If we ever have any other type of
                         * learner in the future, we'll have to change the
                         * way we check for observers.
                         */
			//如果是Observer,则返回当前选举结果
                        if(!self.getVotingView().containsKey(response.sid)){
                            Vote current = self.getCurrentVote();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    logicalclock,
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch());

                            sendqueue.offer(notmsg);
                        }
			else {
                            // Receive new message
                            .......
                            // State of peer that sent this message
				//对方节点状态
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                            switch (response.buffer.getInt()) {
                            case 0:
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            case 1:
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            case 2:
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            case 3:
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            }

                            // Instantiate Notification and set its attributes
				//初始化Notification对象
                            Notification n = new Notification();
                            n.leader = response.buffer.getLong();
                            n.zxid = response.buffer.getLong();
                            n.electionEpoch = response.buffer.getLong();
                            n.state = ackstate;
                            n.sid = response.sid;
                            ......

                            /*
                             * If this server is looking, then send proposed leader
                             */
				//如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息
                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                recvqueue.offer(n);

                                ......
                            } 
			    //如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用
			    else {
                                /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                                Vote current = self.getCurrentVote();
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
                                    if(LOG.isDebugEnabled()){
                                        LOG.debug("Sending new notification. My id =  " +
                                                self.getId() + " recipient=" +
                                                response.sid + " zxid=0x" +
                                                Long.toHexString(current.getZxid()) +
                                                " leader=" + current.getId());
                                    }
                                    ToSend notmsg = new ToSend(
                                            ToSend.mType.notification,
                                            current.getId(),
                                            current.getZxid(),
                                            logicalclock,
                                            self.getPeerState(),
                                            response.sid,
                                            current.getPeerEpoch());
                                    sendqueue.offer(notmsg);
                                }
                            }
                      .......
            }
由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。
Leader选举小结
 1.server启动时默认选举自己,并向整个集群广播

 

2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播
3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO
4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING
5.Obserer机器不参与选举
相关标签: zookeeper