欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于zookeeper分布式协调的动态上下线感知案例

程序员文章站 2022-07-07 14:46:26
...

一、动态上下线感知流程示意图

基于zookeeper分布式协调的动态上下线感知案例

二、详细解析

  当客户端很多时,服务器应付不过来,则需要增加服务器来提高运行效率,这时就可以引入zookeeper

1、当服务器一上线(启动)时,就在zookeeper上注册一个节点/servers/server0001;
2、客户端请求服务时,先去zookeeper上查看节点,解析value数据,得知请求哪台服务器,并且告诉zookeeper开启监听,有子节点发生变化通知客户端;
3、增加一台服务器后,会在zookeeper上注册,生成/servers/server0002节点;
4、zookeeper将增加子节点的事件通知客户端,客户端随机从两个服务器中挑选一个请求;
5、如果某台服务器运行时挂掉,即下线,与zookeeper失去连接,zookeeper感知到会自动删除该服务器对应节点,将减少了子节点事件通知给客户端,客户端便不再请求该服务器。

三、Java代码

1、服务端

/**
 * 服务端
 * 1:建立链接
 * 2:注册
 * 3:server端的业务处理
 * @author root
 *
 */
public class DistributedServer {
    private static final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers";
    ZooKeeper zkClient = null;
    public void GetConnect() throws Exception {

         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                try {
                    zkClient.getChildren(parentNode, true);
                } catch (KeeperException | InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        });
    }

    public void registerServer(String hostName) throws Exception, Exception {
        String create = zkClient.create(parentNode+"/server", hostName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostName+" is online "+ create);
    }

    public void handleBussiness(String hostName) throws InterruptedException{
        System.out.println(hostName + " start working ......");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 获取zk链接
        DistributedServer server = new DistributedServer();
        server.GetConnect();

        //利用zk链接向zk进行注册
        server.registerServer(args[0]);
        // 启动业务功能
        server.handleBussiness(args[0]);
    }
}

2、客户端

/**
 * 客户端
 * 1:链接zk
 * 2:获取列表(server端的列表)
 * 3:监听(接收到通知,更新列表)
 * @author root
 *
 */
public class DistributedClient {
    private static  final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers";
    ZooKeeper zkClient = null;
    // volatile   ????
    private volatile List<String> serverList;

    /**
     * 获取zk链接
     * @throws Exception
     */
    public void getConnect() throws Exception{
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                //有数据变更,更新列表
                try {
                    getServerList();
                } catch (KeeperException | InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }
    /**
     * 获取服务器列表
     * @throws InterruptedException 
     * @throws KeeperException 
     */
    public void getServerList() throws KeeperException, InterruptedException{
        //获取到服务器列表 
        List<String> children = zkClient.getChildren(parentNode, true);
        //创建list集合,用来存放服务器;列表
        List<String> servers = new ArrayList<>();
        for (String key : children) {//
            //通过节点名字得到数据
            byte[] hostName = zkClient.getData(parentNode+"/"+key, false, null);
            servers.add(new String(hostName));
        }

        //把内部数据,赋给成员变量,
        serverList = servers;
        System.out.println(serverList);
    }

    /**
     * 处理业务逻辑
     * @throws InterruptedException 
     */
    public void handleBussiness() throws InterruptedException{
        System.out.println("client  start working ......");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        //获取zk链接
        DistributedClient client = new DistributedClient();
        client.getConnect();

        //获取servers的自己节点列表(并作监听),从中获取服务器信息(ip或者hastname)
        client.getServerList();

        //业务线程启动
        client.handleBussiness();
    }

}