欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于ZooKeeper实现的分布式服务器上下线动态感知系统

程序员文章站 2022-05-07 15:41:51
...

原理大致如下图:
基于ZooKeeper实现的分布式服务器上下线动态感知系统

系统中会有多个消费者,会有多个服务器,zookeeper动态的管理服务器的数量。消费者要时刻知道服务器的数量和位置,当服务器更新的时候,消费者会受到队形的信息。并且两者都不知道对方的具体情况,所以只能通过中间的媒介—ZooKeeper实现。在这里,不写具体的业务逻辑,只写实现的流程逻辑,业务用输出语句代替。
客户端代码如下:

package sense;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class ConsumerDis {

    List<String> onlineServers =null;
    ZooKeeper zk = null;
    public static  void main(String[] args) throws Exception {
        ConsumerDis consumer = new ConsumerDis();

        consumer.connectZK();

        consumer.getOnlineServers();

        consumer.requestService();

    }

    //建立连接
        public void connectZK() throws Exception {
            zk = new ZooKeeper("hadoop-master:2181,hadoop-slave02:2181,hadoop-slave03:2181", 2000, new Watcher() {

                public void process(WatchedEvent event) {
                    try {
                        getOnlineServers();
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        public void getOnlineServers() throws Exception{
            List<String> serverList = new ArrayList();
            List<String> children = zk.getChildren("/servers", false);
            for (String child : children) {
                byte[] data = zk.getData("/servers/"+child, false, null);
                serverList.add(new String(data));
            }

            onlineServers = serverList;
            System.out.println("本消费者查询了一次服务器列表: "+serverList);
        }

        public void requestService() throws Exception {
            Random random = new Random();
            while(true) {
                Thread.sleep(2000);
                if(onlineServers.size()==0) {
                    System.out.println("还没有在线的服务器");
                    continue;
                }
                String server = onlineServers.get(random.nextInt(onlineServers.size()));
                System.out.println("本消费者本次挑选了服务器:"+server);

            }
        }

}

服务器端代码如下:

package sense;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.Provider;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class Provide {

    ZooKeeper zk = null;

    public static void main(String[] args) throws Exception {

        Provide provide = new Provide();

        provide.connectZK();

        provide.registServerInfo();

        provide.handleService();

    }

    //建立连接
        public void connectZK() throws Exception {
            zk = new ZooKeeper("hadoop-master:2181,hadoop-slave02:2181,hadoop-slave03:2181", 2000, null);
        }


    //注册服务信息
    public void registServerInfo() throws Exception {
        String hostName = InetAddress.getLocalHost().getHostName();

        if(zk.exists("/servers", false)==null){
            zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        String path = zk.create("/servers/server", (hostName+":"+8080).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostName+"成功注册了一个节点 : "+path);
    }

    //等待请求,处理业务
    public void handleService() throws Exception {
        ServerSocket ss = new ServerSocket(8080);
        while(true) {
            Socket accept = ss.accept();


        }

    }
}

注:运行的时候在Windows的运行效果不是很明显,可以打成jar包,运行在zookeeper的集群上,不同的节点上启动不同的程序。

可以直接打成Runnable Jar File,记住打jar包的时候,程序必须得先运行一下。

相关标签: ZooKeeper