分布式选举-ZAB算法-2 Leader选举 代码实现
ZAB Leader选举实现
设定集群中有3个节点,通过ZAB算法实现选主。节点之间的通信使用的是自我实现的Remoting组件,基于Netty开发,可以以同步,异步的方式发起通信。在后续《分布式通信》系列的文章中,会向大家详细分析Remoting组件。
分布式选举的项目名称:justin-distribute-election
整体结构如图:
主要Package说明:
callback:异步请求消息的回调处理器集合
client:数据存取接口
message:投票、心跳等消息的集合
processor:接收请求消息的处理器集合
节点相关的类设计:
节点状态类:NodeStatus.class
public enum NodeStatus {
LOOKING,
LEADING,
FOLLOWING,
}
节点类:Node.class
// 投票箱,记录集群中节点的投票信息
private final ConcurrentMap<Integer, Vote> voteBox =
new ConcurrentHashMap<>();
// 记录集群中Follower节点的数据ID
private final ConcurrentMap<Integer, ZxId> zxIdMap =
new ConcurrentHashMap<>();
// 记录集群中Follower节点的数据ID
private final ConcurrentMap<Integer, Boolean> snapshotMap =
new ConcurrentHashMap<>();
// 节点初始为Following状态
private volatile NodeStatus status = NodeStatus.FOLLOWING;
private Vote myVote;
投票类:Vote.class
public class Vote implements Comparable<Vote>{
// 投票的节点ID
private int nodeId;
// 投票周期
private volatile long epoch;
// 被投票的节点ID
private volatile int voteId;
// 节点的数据ID
private volatile ZxId lastZxId;
// 先比较数据ID,如果相等再比较节点ID
@Override
public int compareTo(Vote o) {
if (this.lastZxId.compareTo(o.lastZxId) != 0) {
return this.lastZxId.compareTo(o.lastZxId);
}else if (this.nodeId < o.nodeId) {
return -1;
}else if (this.nodeId > o.nodeId) {
return 1;
}
return 0;
}
线程设计:
-
mgrServer端线程,用于接收管理集群节点的消息;
-
server端线程,用于接收节点发送投票及数据同步的消息;
-
client端线程,用于向节点发送消息;
-
心跳线程,用于Leader节点向Follower节点发送心跳消息;
-
选举线程,用于在集群内选举主节点;
-
节点发现线程,用于新节点加入集群;
具体实现就不列出来了,很容易理解,大家看代码吧。
选举流程:
选举线程调用的方法:election()
private void election() {
if (status == NodeStatus.LEADING) {
return;
}
if (!nodeConfig.resetElectionTick()) {
return;
}
status = NodeStatus.LOOKING;
// 选举周期加1
epoch += 1;
// 清空集群中节点的数据ID记录
zxIdMap.clear();
// 获取最新的数据ID,组装投票信息
this.myVote = new Vote(nodeConfig.getNodeId(),
nodeConfig.getNodeId(), 0, getLastZxId());
this.myVote.setEpoch(epoch);
// 将本地节点的投票放入投票箱
this.voteBox.put(nodeConfig.getNodeId(), myVote);
VoteMessage voteMessage = VoteMessage.getInstance();
voteMessage.setVote(myVote);
// 单向发送投票消息,不需要回复
sendOneWayMsg(voteMessage.request());
}
投票消息处理类:VoteRequestProcessor.class
// 将其他节点的投票消息记入投票箱
node.getVoteBox().put(peerVote.getNodeId(), peerVote);
// 比较选举周期
if (peerVote.getEpoch() > node.getMyVote().getEpoch()) {
node.getMyVote().setEpoch(peerVote.getEpoch());
node.getMyVote().setVoteId(peerVote.getNodeId());
node.setStatus(NodeStatus.LOOKING);
}else if (peerVote.getEpoch() == node.getMyVote().getEpoch()) {
// 周期相同,则比较数据ID和节点ID
if (peerVote.compareTo(node.getMyVote()) == 1) {
// 比较后,如果对端数据ID或节点ID大,则将投票ID记录为对端节点ID
node.getMyVote().setVoteId(peerVote.getNodeId());
node.setStatus(NodeStatus.LOOKING);
}
}
// 得票超过半数,则成为Leader
if (node.isHalf()) {
logger.info("Node:{} become leader!", node.getNodeConfig().getNodeId());
node.becomeLeader();
}else if (node.getStatus() == NodeStatus.LOOKING){
// 向其他节点发送新的投票消息
VoteMessage voteMsg = VoteMessage.getInstance();
voteMsg.setVote(node.getMyVote());
node.sendOneWayMsg(voteMsg.request());
}
心跳线程调用的方法:heartbeat()
long index = -1;
// 获取Follower节点应该同步的数据ID
if (zxIdMap.containsKey(entry.getKey())) {
index = zxIdMap.get(entry.getKey()).getCounter();
}else {
index = dataManager.getLastIndex();
}
// 获取数据
Data data = dataManager.read(index);
if (data.getZxId().getEpoch() == 0) {
data.getZxId().setEpoch(epoch);
}
// 组装数据消息
DataMessage dataMsg = DataMessage.getInstance();
dataMsg.setNodeId(nodeConfig.getNodeId());
dataMsg.setType(DataMessage.Type.SYNC);
dataMsg.setData(data);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// 发送数据消息
RemotingMessage response = client.invokeSync(
entry.getValue(), dataMsg.request(), 3*1000);
DataMessage res = DataMessage.getInstance()
.parseMessage(response);
// 将Follower节点应该同步的数据ID放入zxIdMap中
if (res.getSuccess()) {
int peerId = res.getNodeId();
ZxId peerZxId = res.getData().getZxId();
zxIdMap.put(peerId, peerZxId);
}
} catch (Exception e) {
logger.error(e);
}
}
});
心跳消息处理类:DataRequestProcessor.class
logger.info("Receive heartbeat message: " + dataMsg);
// 重置选举计时器和心跳计时器
node.getNodeConfig().setPreElectionTime(System.currentTimeMillis());
node.getNodeConfig().setPreHeartbeatTime(System.currentTimeMillis());
// 节点切换为Following状态
node.setStatus(NodeStatus.FOLLOWING);
// 设置主节点ID
node.setLeaderId(dataMsg.getNodeId());
至此,ZAB算法的Leader选举代码实现完成。
接下来的文章我们来分析分布式数据复制的原理,同时完善Raft算法数据复制部分的代码。
代码地址:https://github.com/Justin02180218?tab=repositories
更多【分布式专辑】系列文章,请关注公众号
上一篇: 购物车实战案例