zookeeper实现master选举
程序员文章站
2022-07-12 14:57:42
...
master选举
随着分布式系统的流行,现在许多服务都需要24小时工作,如果机器挂了的话,我们希望能有其他机器顶替他继续工作。这种问题通常采用master-slave模式,即是正常情况下主机提供服务,备用机器监听主机状态,当主机发生异常时,会选取一个备用机器代替主机器提供服务,这个选举过程即是master选举。
由于zookeeper能保证同一时刻只能具有一个主节点,即使有网络问题,zookeeper的集群也完全可以解决,而且zookeeper的通知机制,客户端可以监听节点的变化,所以可以考虑采用zookeeper实现master选举。
实现原理图
实现原理图如上,左侧是zookeeper节点,master节点保存当前master的机器,server节点保存可用的备用机器,每个服务器在创建的时候会进行选举,即每个服务器同时去创建master节点,只有一个机器能创建成功,创建成功的机器则被选为master,其他机器会同时监听master节点的变化,当主机器挂了时,master节点会被删除,其他机器会重新进行master选举。
代码实现
实现代码如下,WorkClient代表每个机器, 启动时会订阅master节点的变化,同时进行选举,为了模拟效果,3秒后自动释放master, master节点被删除时,重新进行选举,这里模拟3秒后重新选举。
public class WorkClient {
private ZkClient zkClient;
private IZkDataListener zkDataListener;
private String MASTER = "/master";
private String clientName;
private boolean isRunning;
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
public WorkClient(ZkClient zkClient, String clientName) {
this.zkClient = zkClient;
this.clientName = clientName;
this.zkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
// 删除时 3秒后重新选举
executorService.schedule(new Runnable() {
@Override
public void run() {
takeMaster();
}
}, 3, TimeUnit.SECONDS);
}
};
}
private boolean takeMaster() {
// 创建master节点
try {
// 重新选举
System.out.println(clientName + "开始竞争master");
zkClient.create(MASTER, clientName, CreateMode.EPHEMERAL);
System.out.println(clientName + "成功被选中master");
// 3秒后释放master
executorService.schedule(new Runnable() {
@Override
public void run() {
releaseMaster();
}
}, 3, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e) {
// 若节点已存在 则已经被选举
return false;
}
return true;
}
private void releaseMaster() {
zkClient.delete(MASTER);
}
public void start() {
if (isRunning) {
throw new RuntimeException("server has running");
}
isRunning = true;
System.out.println(clientName + " server start...");
takeMaster();
zkClient.subscribeDataChanges(MASTER, zkDataListener);
}
public void stop() {
if (!isRunning) {
throw new RuntimeException(clientName + "server has stop..");
}
System.out.println(clientName + "server stop..");
zkClient.unsubscribeDataChanges(MASTER, zkDataListener);
executorService.shutdown();
}
}
测试代码如下:
private static final int CLIENT_QTY = 10;
private static final String SERVER = "47.96.233.184:2181";
public static void main(String[] args) throws Exception {
List<ZkClient> clients = new ArrayList<>();
List<WorkClient> workClients = new ArrayList<>();
try {
for (int i = 0; i< CLIENT_QTY; ++i) {
ZkClient client = new ZkClient(SERVER, 5000, 5000, new SerializableSerializer());
clients.add(client);
WorkClient workClient = new WorkClient(client, "Client #" + i);
workClients.add(workClient);
workClient.start();
}
System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("Shutting down...");
for (WorkClient client : workClients) {
client.stop();
}
for (ZkClient client : clients) {
client.close();
}
}
}
运行结果: