基于ZooKeeper实现的分布式服务器上下线动态感知系统
程序员文章站
2022-05-07 15:41:51
...
原理大致如下图:
系统中会有多个消费者,会有多个服务器,zookeeper动态的管理服务器的数量。消费者要时刻知道服务器的数量和位置,当服务器更新的时候,消费者会受到队形的信息。并且两者都不知道对方的具体情况,所以只能通过中间的媒介—ZooKeeper实现。在这里,不写具体的业务逻辑,只写实现的流程逻辑,业务用输出语句代替。
客户端代码如下:
package sense;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class ConsumerDis {
List<String> onlineServers =null;
ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
ConsumerDis consumer = new ConsumerDis();
consumer.connectZK();
consumer.getOnlineServers();
consumer.requestService();
}
//建立连接
public void connectZK() throws Exception {
zk = new ZooKeeper("hadoop-master:2181,hadoop-slave02:2181,hadoop-slave03:2181", 2000, new Watcher() {
public void process(WatchedEvent event) {
try {
getOnlineServers();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
public void getOnlineServers() throws Exception{
List<String> serverList = new ArrayList();
List<String> children = zk.getChildren("/servers", false);
for (String child : children) {
byte[] data = zk.getData("/servers/"+child, false, null);
serverList.add(new String(data));
}
onlineServers = serverList;
System.out.println("本消费者查询了一次服务器列表: "+serverList);
}
public void requestService() throws Exception {
Random random = new Random();
while(true) {
Thread.sleep(2000);
if(onlineServers.size()==0) {
System.out.println("还没有在线的服务器");
continue;
}
String server = onlineServers.get(random.nextInt(onlineServers.size()));
System.out.println("本消费者本次挑选了服务器:"+server);
}
}
}
服务器端代码如下:
package sense;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.Provider;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Provide {
ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
Provide provide = new Provide();
provide.connectZK();
provide.registServerInfo();
provide.handleService();
}
//建立连接
public void connectZK() throws Exception {
zk = new ZooKeeper("hadoop-master:2181,hadoop-slave02:2181,hadoop-slave03:2181", 2000, null);
}
//注册服务信息
public void registServerInfo() throws Exception {
String hostName = InetAddress.getLocalHost().getHostName();
if(zk.exists("/servers", false)==null){
zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String path = zk.create("/servers/server", (hostName+":"+8080).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName+"成功注册了一个节点 : "+path);
}
//等待请求,处理业务
public void handleService() throws Exception {
ServerSocket ss = new ServerSocket(8080);
while(true) {
Socket accept = ss.accept();
}
}
}
注:运行的时候在Windows的运行效果不是很明显,可以打成jar包,运行在zookeeper的集群上,不同的节点上启动不同的程序。
可以直接打成Runnable Jar File,记住打jar包的时候,程序必须得先运行一下。