基于zookeeper分布式协调的动态上下线感知案例
程序员文章站
2022-07-07 14:46:26
...
一、动态上下线感知流程示意图
二、详细解析
当客户端很多时,服务器应付不过来,则需要增加服务器来提高运行效率,这时就可以引入zookeeper
1、当服务器一上线(启动)时,就在zookeeper上注册一个节点/servers/server0001;
2、客户端请求服务时,先去zookeeper上查看节点,解析value数据,得知请求哪台服务器,并且告诉zookeeper开启监听,有子节点发生变化通知客户端;
3、增加一台服务器后,会在zookeeper上注册,生成/servers/server0002节点;
4、zookeeper将增加子节点的事件通知客户端,客户端随机从两个服务器中挑选一个请求;
5、如果某台服务器运行时挂掉,即下线,与zookeeper失去连接,zookeeper感知到会自动删除该服务器对应节点,将减少了子节点事件通知给客户端,客户端便不再请求该服务器。
三、Java代码
1、服务端
/**
* 服务端
* 1:建立链接
* 2:注册
* 3:server端的业务处理
* @author root
*
*/
public class DistributedServer {
private static final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";
ZooKeeper zkClient = null;
public void GetConnect() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
zkClient.getChildren(parentNode, true);
} catch (KeeperException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
public void registerServer(String hostName) throws Exception, Exception {
String create = zkClient.create(parentNode+"/server", hostName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName+" is online "+ create);
}
public void handleBussiness(String hostName) throws InterruptedException{
System.out.println(hostName + " start working ......");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 获取zk链接
DistributedServer server = new DistributedServer();
server.GetConnect();
//利用zk链接向zk进行注册
server.registerServer(args[0]);
// 启动业务功能
server.handleBussiness(args[0]);
}
}
2、客户端
/**
* 客户端
* 1:链接zk
* 2:获取列表(server端的列表)
* 3:监听(接收到通知,更新列表)
* @author root
*
*/
public class DistributedClient {
private static final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";
ZooKeeper zkClient = null;
// volatile ????
private volatile List<String> serverList;
/**
* 获取zk链接
* @throws Exception
*/
public void getConnect() throws Exception{
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
//有数据变更,更新列表
try {
getServerList();
} catch (KeeperException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
/**
* 获取服务器列表
* @throws InterruptedException
* @throws KeeperException
*/
public void getServerList() throws KeeperException, InterruptedException{
//获取到服务器列表
List<String> children = zkClient.getChildren(parentNode, true);
//创建list集合,用来存放服务器;列表
List<String> servers = new ArrayList<>();
for (String key : children) {//
//通过节点名字得到数据
byte[] hostName = zkClient.getData(parentNode+"/"+key, false, null);
servers.add(new String(hostName));
}
//把内部数据,赋给成员变量,
serverList = servers;
System.out.println(serverList);
}
/**
* 处理业务逻辑
* @throws InterruptedException
*/
public void handleBussiness() throws InterruptedException{
System.out.println("client start working ......");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
//获取zk链接
DistributedClient client = new DistributedClient();
client.getConnect();
//获取servers的自己节点列表(并作监听),从中获取服务器信息(ip或者hastname)
client.getServerList();
//业务线程启动
client.handleBussiness();
}
}