zookeeper中的主要知识点
zookeeper中的一些主要的知识点
##1.应用场景
1.1 什么是zookeeper
- Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务
- zookeeper是为别的分布式程序服务的
- Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
- Zookeeper集群的角色: Leader 和 follower (Observer)
- zookeeper在底层最核心的两个功能:
- 管理(存储,读取)用户程序提交的数据
- 并为用户程序提供数据节点监听服务
1.2 zookeeper服务的应用场景
- 主从协调
- 服务器节点动态上下线,解决单点故障
- 统一配置管理
- 分布式共享锁
- 统一的命名空间
1.3 zookeeper集群特性
-
Zookeeper:一个leader,多个follower组成的集群
-
全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
-
分布式读写,更新请求转发,由leader实施
-
更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
-
数据更新原子性,一次数据更新要么成功,要么失败
-
实时性,在一定时间范围内,client能读到最新数据
1.4 zookeeper数据结构
-
层次化的目录结构,命名符合常规文件系统规范
-
每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
-
节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
-
客户端应用可以在节点上设置监视器
##1.5 Zookeeper 数据结构特点
- 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
- znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL类型的目录节点不能有子节点目录
- znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
- znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
- znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
- znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍
##2.服务器动态上下线感知
1、我们的服务器一启动的时候就去zookeeper去注册,zookeeper记录注册服务器的IP等信息。
2、去zookeeper注册的节点必须是临时节点,这样当服务器宕机下线的时候,zookeeper会把这个节点删除掉,这样才会产生事件,客户端才能监听到。
3、客户端一启动的时候就去连接zookeeper,然后去getChildren并且注册监听,获取当前在线服务器列表,然后选择服务器进行连接(可以获取到服务器的IP或者服务器的连接数量,这样还可以通过连接数量来进行负载均衡)
4、一旦服务器触发上下线的事件,就会通知到注册监听的客户端,客户端的监听器的process()方法就会调用,然后在process()方法内又可以继续getChildren并且注册监听(重新获取服务器列表并注册监听)
1,服务器启动的时候就去zookeeper注册,2,zookeeper记录注册的服务器的ip等信息,注意,创建的节点必须是临时节点,3,客户端启动就去zookeeper去getChildren获取服务器列表并注册监听,4,当服务器宕机下线的时候,zookeeper的临时节点就会被删除掉,触发监听事件,5,注册了监听器的客户端就会去调用process()方法,process()方法里面重新获取服务器列表并注册监听
现在假设有这样一种需求:服务端节点有多个,可以动态的上下线;需要让任意一台客户端都能实时感知服务端节点的变化,进而连接目前可提供服务的节点。
实现思路:我们可以借助于zookeeper这个第三方中间件,在每台服务器启动时都向zookeeper注册服务器的节点信息(比如:/servers/server01;/servers/server02);客户端每次调用之前都通过getChildren方法获取最新的服务器节点信息,同时客户端在zookeeper注册监听,监听服务器节点的变化;如果某刻服务器server01下线了,zookeeper就会发出节点变化通知客户端,回调process方法拉取最新的服务器节点信息。
服务端代码如下:
public class DistributeServer {
private static final String CONNECT_URL = "mini1:2181,mini2:2181,mini3:2181";
private static final int SESSION_TIME_OUT = 2000;
private static final String PARENT_NODE = "/servers";
private ZooKeeper zkCli = null;
/**
* @Description 获取连接
* @Author 刘俊重
* @Date 2017/12/13
*/
public void getConnect() throws Exception{
zkCli = new ZooKeeper(CONNECT_URL, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType()+"-----------"+event.getPath());
try{
zkCli.getChildren("/", true);
}catch (Exception e){
}
}
});
}
/**
* @Description 服务器启动时向zookeeper注册服务信息
* @Author 刘俊重
* @Date 2017/12/13
*/
public void registerServer(String hostName) throws Exception {
String s = zkCli.create(PARENT_NODE + "/", hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("服务器:"+hostName+"已经注册完毕");
}
/**
* @Description 模拟实际的业务操作
* @Author 刘俊重
* @Date 2017/12/13
*/
public void handelBusiness(String hostname) throws Exception {
System.out.println("服务器:"+hostname+"正在处理业务。。。");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeServer server = new DistributeServer();
server.getConnect();
server.registerServer(args[0]);
server.handelBusiness(args[0]);
}
}
客户端代码如下:
/**
- @author 刘俊重
- @Description 模拟客户端,拉取最新服务器节点列表并向zookeeper设置监听
*/
public class DistributeClient {
private static final String CONNECT_URL = "mini1:2181,mini2:2181,mini3:2181";
private static final int SESSION_TIME_OUT = 2000;
private static final String PARENT_NODE = "/servers";
private ZooKeeper zkCli = null;
private volatile List<String> serverList = null;
/**
- @Description 获取连接
- @Author 刘俊重
- @Date 2017/12/13
*/
public void getConnect() throws Exception{
zkCli = new ZooKeeper(CONNECT_URL, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println(event.getType()+"-----------"+event.getPath());
try{
//重新更新服务器列表,并且注册了监听
getServerList();
}catch (Exception e){
}
}
});
}
/**
- @Description 获取服务器子节点信息,并对父节点进行监听
- @Author 刘俊重
*/
public void getServerList() throws Exception {
List<String> children = zkCli.getChildren(PARENT_NODE, true);
List<String> servers = new ArrayList<String>();
for(String child : children){
// child只是子节点的节点名
byte[] data = zkCli.getData(PARENT_NODE + "/" + child, false, null);
servers.add(new String(data));
}
//把servers赋值给成员变量serverList,以提供给各业务线程使用
serverList = servers;
System.out.println("节点数据:"+serverList);
}
/**
- @Description 模拟实际的业务操作
- @Author 刘俊重
- @Date 2017/12/13
*/
public void handelBusiness() throws Exception {
System.out.println("客户端开始工作。。。");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeClient client = new DistributeClient();
client.getConnect();
client.getServerList();
client.handelBusiness();
}
}
##3.主从协调
主-从模式的模型中,主要包括三个角色:
**主节点:**主要负责监视新的节点和任务,分配任务给可用的从节点;
**从节点:**通过注册自己,确保主节点看到它们可以执行任务,收到主节点分配的任务后,执行并记录状态;
**客户端:**创建新的任务并等待系统响应。
###3.1 主节点角色
因为只有一个进程会成为主节点,所以进程成为主节点后必须锁定管理权,因此进程需要创建名为/master的临时节点,并写入数据,记录该进程的信息,如IP,编号等。
其它进程在尝试创建/master成为主节点时,ZooKeeper会报错,提示该节点已存在。然而主节点可能会崩溃,其它节点需要接替它成为主节点,因此需要在主节点/master上设置监视点(watch)。当监视到/master不存在时,该进程再次创建/master节点,尝试成为主节点。
###3.2 从节点角色
从节点首先要通知主节点,告知主节点自己可以执行任务。从节点通过在/workers子节点下创建临时节点,并在字节点中使用主机名或IP来标识自己,如:/workers/worker1.example.com。主节点通过监视(watch)/workers节点,获取所有可用的从节点信息。
从节点需要在/assign下创建自己的子节点,用于接收任务分配,如/assign/worker1.example.com,并监视这个节点的变化,等待新的任务。
###3.3 客户端角色
客户端向系统中添加任务(有序节点),我们需要按照任务添加的顺序创建节点,其本质上是一个队列。如执行操作create -s /tasks/task-,会生成/tasks/task-00(数字依次递增),客户端需要知道该任务的完成状态,因此需要监视(watch)该节点。
###3.4各个角色相互配合实现主从协作
主节点监视/tasks,当客户端添加任务时,创建/tasks/task-00,主节点收到通知,会去/workers下检查可用的从节点。
获取到从节点列表后,选择其中一个从节点,分派任务:/assign/worker1.example.com/task-00。
从节点通过监视/assign/worker1.example.com,获取自己的任务,并执行。任务完成后,从节点会在/tasks/task-00下更新状态,告知客户端该任务已完成:/tasks/task-00/status。
客户端收到完成任务完成的通知后,整个任务的执行就结束了。当然任务可能会非常复杂,甚至涉及另一个分布式系统。但是不管是什么样的任务,执行的机制与通过ZooKeeper来传递结果,本质上都是一样的。
##4.配置管理
zookeeper提供了节点watch的功能,zookeeper的client(对外提供服务的server)监控zookeeper上的节点(znode),当节点变动的时候,client会收到变动事件和变动后的内容,基于zookeeper的这个特性,我们可以给服务器集群中的所有机器(client)都注册watch事件,监控特定znode,节点中存储部署代码的配置信息,需要更新代码的时候,修改znode中的值,服务器集群中的每一台server都会收到代码更新事件,然后触发调用,更新目标代码。也可以很容易的横向扩展,可以随意的增删机器,机器启动的时候注册监控节点事件即可。
##5.分布式共享锁
在zk上创建永久节点server,所有要访问资源的客户端在永久节点server**册临时有序节点,并且监听自己前一个节点。
***最小的获得锁可以访问资源,访问结束,断开连接,注册的临时节点被删除,他的下一个节点通过监听能够知道,
此时节点***变为最小,获取到了锁,可以访问资源。
假设现在集群中有50台机器对某台机器上的同一文件进行修改,如何才能保证这个文件不会被写乱呢,使用java中的synchronized锁肯定是不行的,因为这个锁是对某个程序而言的,而我们这根本就不是在一个服务器上,怎么会锁的住,用zookeeper实现的分布式锁可以实现。
设计思路:服务器启动时都去zookeeper上注册一个“短暂+序号”的znode节点(如/lock/1;/lock/2),并设置监听父节点变化;获取到父节点下所有子节点,并比较序号的大小;约定比如序号最小的获取锁,去操作某一文件,操作完成后删除自己的节点(相当于释放锁),并注册一个新的“短暂+序号”的znode节点;其它程序收到zookeeper发送的节点变化的通知之后,去比较序号的大小,看谁获得新锁。
public class DistributedClientLock {
// 会话超时
private static final int SESSION_TIMEOUT = 2000;
// zookeeper集群地址
private String hosts = "mini1:2181,mini2:2181,mini3:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 记录自己创建的子节点路径
private volatile String thisPath;
/**
* 连接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
// 判断事件类型,此处只处理子节点变化事件
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//获取子节点,并对父节点进行监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比较是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//访问共享资源处理业务,并且在处理完成之后删除锁
doSomething();
//重新注册一把新的锁
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 1、程序一进来就先注册一把锁到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会,便于观察
Thread.sleep(new Random().nextInt(1000));
// 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果争抢资源的程序就只有自己,则可以直接去访问共享资源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 处理业务逻辑,并且在最后释放锁
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
} finally {
System.out.println("finished: " + thisPath);
//释放锁
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientLock dl = new DistributedClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}
6.选举机制
服务器初始化时Leader选举
zookeeper由于其自身的性质,一般建议选取奇数个节点进行搭建分布式服务器集群。以3个节点组成的服务器集群为例,说明服务器初始化时的选举过程。启动第一台安装zookeeper的节点时,无法单独进行选举,启动第二台时,两节点之间进行通信,开始选举Leader。
1)每个Server投出一票。他们两都选自己为Leader,投票的内容为(SID,ZXID)。SID即Server的id,安装zookeeper时配置文件中所配置的myid;ZXID,事务id,为节点的更新程度,ZXID越大,代表Server对Znode的操作越新。由于服务器初始化,每个Sever上的Znode为0,所以Server1投的票为(1,0),Server2为(2,0)。两Server将各自投票发给集群中其他机器。
2)每个Server接收来自其他Server的投票。集群中的每个Server先判断投票有效性,如检查是不是本轮的投票,是不是来Looking状态的服务器投的票。
3)对投票结果进行处理。先了解下处理规则
- 首先对比ZXID。ZXID大的服务器优先作为Leader
- 若ZXID相同,比如初始化的时候,每个Server的ZXID都为0,就会比较myid,myid大的选出来做Leader。
对于Server而言,他接受到的投票为(2,0),因为自身的票为(1,0),所以此时它会选举Server2为Leader,将自己的更新为(2,0)。而Server2收到的投票为Server1的(1,0)由于比他自己小,Server2的投票不变。Server1和Server2再次将票投出,投出的票都为(2,0)。
4) 统计投票。每次投票之后,服务器都会统计投票信息,如果判定某个Server有过半的票数投它,那么该Server将会作为Leader。对于Server1和Server2而言,统计出已经有两台机器接收了(2,0)的投票信息,此时认为选出了Leader。
5)改变服务器状态。当确定了Leader之后,每个Server更新自己的状态,Leader将状态更新为Leading,Follower将状态更新为Following。
服务器运行期间的Leader选举
zookeeper运行期间,如果有新的Server加入,或者非Leader的Server宕机,那么Leader将会同步数据到新Server或者寻找其他备用Server替代宕机的Server。若Leader宕机,此时集群暂停对外服务,开始在内部选举新的Leader。假设当前集群中有Server1、Server2、Server3三台服务器,Server2为当前集群的Leader,由于意外情况,Server2宕机了,便开始进入选举状态。过程如下
1) 变更状态。其他的非Observer服务器将自己的状态改变为Looking,开始进入Leader选举。
2) 每个Server发出一个投票(myid,ZXID),由于此集群已经运行过,所以每个Server上的ZXID可能不同。假设Server1的ZXID为145,Server3的为122,第一轮投票中,Server1和Server3都投自己,票分别为(1,145)、(3,122),将自己的票发送给集群中所有机器。
3) 每个Server接收接收来自其他Server的投票,接下来的步骤与初始化时相同。