欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式选举-ZAB算法-2 Leader选举 代码实现

程序员文章站 2022-06-21 17:07:00
...

ZAB Leader选举实现

设定集群中有3个节点,通过ZAB算法实现选主。节点之间的通信使用的是自我实现的Remoting组件,基于Netty开发,可以以同步,异步的方式发起通信。在后续《分布式通信》系列的文章中,会向大家详细分析Remoting组件。

 

分布式选举的项目名称:justin-distribute-election

整体结构如图:

分布式选举-ZAB算法-2 Leader选举 代码实现

 

主要Package说明:

callback:异步请求消息的回调处理器集合

client:数据存取接口

message:投票、心跳等消息的集合

processor:接收请求消息的处理器集合

 

节点相关的类设计:

节点状态类:NodeStatus.class

public enum NodeStatus {
    LOOKING,
    LEADING,
    FOLLOWING,
}

节点类:Node.class

// 投票箱,记录集群中节点的投票信息
private final ConcurrentMap<Integer, Vote> voteBox =
new ConcurrentHashMap<>();
// 记录集群中Follower节点的数据ID
private final ConcurrentMap<Integer, ZxId> zxIdMap =
new ConcurrentHashMap<>();
// 记录集群中Follower节点的数据ID
private final ConcurrentMap<Integer, Boolean> snapshotMap =
new ConcurrentHashMap<>();
// 节点初始为Following状态
private volatile NodeStatus status = NodeStatus.FOLLOWING;
private Vote myVote;

投票类:Vote.class

public class Vote implements Comparable<Vote>{
    // 投票的节点ID
    private int nodeId;
    // 投票周期
    private volatile long epoch;
    // 被投票的节点ID
    private volatile int voteId;
    // 节点的数据ID
    private volatile ZxId lastZxId;
    
    // 先比较数据ID,如果相等再比较节点ID
    @Override
    public int compareTo(Vote o) {
        if (this.lastZxId.compareTo(o.lastZxId) != 0) {
            return this.lastZxId.compareTo(o.lastZxId);
        }else if (this.nodeId < o.nodeId) {
            return -1;
        }else if (this.nodeId > o.nodeId) {
            return 1;
        }
        return 0;
    }

线程设计:

  • mgrServer端线程,用于接收管理集群节点的消息;

  • server端线程,用于接收节点发送投票及数据同步的消息;

  • client端线程,用于向节点发送消息;

  • 心跳线程,用于Leader节点向Follower节点发送心跳消息;

  • 选举线程,用于在集群内选举主节点;

  • 节点发现线程,用于新节点加入集群;

 

    具体实现就不列出来了,很容易理解,大家看代码吧。

 

选举流程:

分布式选举-ZAB算法-2 Leader选举 代码实现

选举线程调用的方法:election()

private void election() {
    if (status == NodeStatus.LEADING) {
        return;
    }
    if (!nodeConfig.resetElectionTick()) {
        return;
    }

    status = NodeStatus.LOOKING;
    // 选举周期加1
    epoch += 1;
    // 清空集群中节点的数据ID记录
    zxIdMap.clear();
    // 获取最新的数据ID,组装投票信息
    this.myVote = new Vote(nodeConfig.getNodeId(),
                        nodeConfig.getNodeId(), 0, getLastZxId());
    this.myVote.setEpoch(epoch);
    // 将本地节点的投票放入投票箱
    this.voteBox.put(nodeConfig.getNodeId(), myVote);
    VoteMessage voteMessage = VoteMessage.getInstance();
    voteMessage.setVote(myVote);
    // 单向发送投票消息,不需要回复
    sendOneWayMsg(voteMessage.request());
}

投票消息处理类:VoteRequestProcessor.class

// 将其他节点的投票消息记入投票箱
node.getVoteBox().put(peerVote.getNodeId(), peerVote);
// 比较选举周期
if (peerVote.getEpoch() > node.getMyVote().getEpoch()) {
    node.getMyVote().setEpoch(peerVote.getEpoch());
    node.getMyVote().setVoteId(peerVote.getNodeId());
    node.setStatus(NodeStatus.LOOKING);
}else if (peerVote.getEpoch() == node.getMyVote().getEpoch()) {
    // 周期相同,则比较数据ID和节点ID
    if (peerVote.compareTo(node.getMyVote()) == 1) {
    // 比较后,如果对端数据ID或节点ID大,则将投票ID记录为对端节点ID
node.getMyVote().setVoteId(peerVote.getNodeId());
        node.setStatus(NodeStatus.LOOKING);
    }
}
// 得票超过半数,则成为Leader
if (node.isHalf()) {
    logger.info("Node:{} become leader!", node.getNodeConfig().getNodeId());
    node.becomeLeader();
}else if (node.getStatus() == NodeStatus.LOOKING){
    // 向其他节点发送新的投票消息
VoteMessage voteMsg = VoteMessage.getInstance();
    voteMsg.setVote(node.getMyVote());
    node.sendOneWayMsg(voteMsg.request());
}

心跳线程调用的方法:heartbeat()

long index = -1;
// 获取Follower节点应该同步的数据ID
if (zxIdMap.containsKey(entry.getKey())) {
    index = zxIdMap.get(entry.getKey()).getCounter();
}else {
    index = dataManager.getLastIndex();
}
// 获取数据
Data data = dataManager.read(index);
if (data.getZxId().getEpoch() == 0) {
    data.getZxId().setEpoch(epoch);
}
// 组装数据消息
DataMessage dataMsg = DataMessage.getInstance();
dataMsg.setNodeId(nodeConfig.getNodeId());
dataMsg.setType(DataMessage.Type.SYNC);
dataMsg.setData(data);
executorService.submit(new Runnable() {
    @Override
    public void run() {
        try {
            // 发送数据消息
            RemotingMessage response = client.invokeSync(
                       entry.getValue(), dataMsg.request(), 3*1000);
            DataMessage res = DataMessage.getInstance()
                                .parseMessage(response);    
            // 将Follower节点应该同步的数据ID放入zxIdMap中
            if (res.getSuccess()) {
int peerId = res.getNodeId();
                ZxId peerZxId = res.getData().getZxId();
                zxIdMap.put(peerId, peerZxId);
            }
        } catch (Exception e) {
            logger.error(e);
        }
    }
});

心跳消息处理类:DataRequestProcessor.class

logger.info("Receive heartbeat message: " + dataMsg);
// 重置选举计时器和心跳计时器
node.getNodeConfig().setPreElectionTime(System.currentTimeMillis());
node.getNodeConfig().setPreHeartbeatTime(System.currentTimeMillis());
// 节点切换为Following状态
node.setStatus(NodeStatus.FOLLOWING);
// 设置主节点ID
node.setLeaderId(dataMsg.getNodeId());

至此,ZAB算法的Leader选举代码实现完成。

 

接下来的文章我们来分析分布式数据复制的原理,同时完善Raft算法数据复制部分的代码。

代码地址:https://github.com/Justin02180218?tab=repositories


更多【分布式专辑】系列文章,请关注公众号

分布式选举-ZAB算法-2 Leader选举 代码实现

相关标签: 分布式架构