欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ZooKeeper分布式

程序员文章站 2022-07-12 17:14:15
...

1:zk的相关特性

1、一致性:数据一致性,数据按顺序分批入库。
2、原子性:事务要么都成功,要么都失败,不会局部化。
3、单一视图:客户端连接集群中的任一zk节点,数据都是一致的。
4、可靠性:每次对zk的操作状态都保存在服务器中。
5、实时性:客户端可以读取zk服务端的最新数据

2:linux上环境变量在/etc/profile 中编辑过后需要执行source /etc/profile 重新导入刚刚的配置

3:Zookeeper中zoo.cfg配置

1、tickTime:用于计算的时间单元。比如session超时:N*tickTime.
2、initLimit:用于集群,允许从节点连接并同步到 master节点的初始化连接时间,以tickTime 的倍数来表示.
3、syncLimit:用于集群, master主节点与 从节点 之间发送消息,请求和应答 时间长度(心跳机制)
4、dataDir:必须配置。
5、dataLogDir:日志目录。
6、clientPort:连接服务器的端口,默认2181

4:基本数据类型

1、是一个树型结构,类似前端开发中的tree.js组件。
2、每个节点称之为znode,它可以有子节点,也可以有数据。
3、每个节点分为临时节点和永久节点,临时节点会在客户端断开后消失。
4、每个zk节点都各自的版本号,可以通过命令行来显示节点信息。
5、每个节点数据发生变化,那么节点的版本号会累加(乐观锁)。
6、删除、修改过期节点,版本号不匹配则会报错。
6、每个zk节点存储的数据不宜过大,几K即可。
7、节点可以设置acl,可以通过权限来控制用户的访问

5:Linux的ZK客户端命令行学习:

./zkCli.sh可以启动客户端,使用help命令查看命令详解,使用Ctrl+C可以退出客户端:

zkCli.sh -server [ip]:[port] 连接zk

查看服务器状态

zkServer.sh status
启停服务器
zkServer.sh start

ZooKeeper -server host:port cmd args

stat path [watch] //stat命令用于查看节点的状态信息

set path data [version] //set命令用于设置节点的数据

ls path [watch] //ls命令用于获取路径下的节点信息,注意路径为绝对路径

delquota [-n|-b] path //delquota命令用于删除配额,-n为子节点个数,-b为节点数据长度

ls2 path [watch] //ls2命令是ls命令的增强版,比ls命令多输出本节点信息

setAcl path acl // setAcl命令用于设置节点Acl,Acl由三部分构成:1为scheme,2为user,3为permission,一般情况下表示为scheme:id:permissions

setquota -n|-b val path //setquota命令用于设置节点个数以及数据长度的配额

history //history用于列出最近的命令历史,可以和redo配合使用

redo cmdno //redo命令用于再次执行某个命令

printwatches on|off //printWatchers命令用于设置和显示监视状态,值为on或则off

delete path [version] //delete命令用于删除节点,如delete /nodeDelete

sync path //sync命令用于强制同步,由于请求在半数以上的zk server上生效就表示此请求生效,那么就会有一些zk server上的数据是旧的。sync命令就是强制同步所有的更新操作。

listquota path //查看指定znode的配额

rmr path //递归删除

get path [watch] //get命令用于获取节点的信息,注意节点的路径必须是以/开头的绝对路径。如get /

create [-s] [-e] path data acl //create命令用于创建节点,其中-s为顺序充点,-e临时节点

addauth scheme auth //addauth命令用于节点认证,使用方式:如addauth digest username:password

quit //退出客户端

getAcl path //获取节点的Acl,如getAcl /node1

close //close命令用于关闭与服务端的链接

connect host:port //连接zk服务端,与close命令配合使用可以连接或者断开zk服务端

返回信息的具体含义:

cZxid = 0x0 //节点创建时的zxid

ctime = Thu Jan 01 08:00:00 CST 1970 //节点创建时间

mZxid = 0x0 //节点最近一次更新时的zxid

mtime = Thu Jan 01 08:00:00 CST 1970 //节点最近一次更新的时间

pZxid = 0x2c //子节点的id

cversion = 10 //子节点数据更新次数

dataVersion = 0 //本节点数据更新次数

aclVersion = 0 //节点ACL(授权信息)的更新次数

ephemeralOwner = 0x0 //如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0

dataLength = 0 //节点数据长度,本例中为hello world的长度

numChildren = 10 //子节点个数

6:zk的作用体现:

1、master节点选举,主节点挂了以后,从节点就会接手工作,并且保证这个节点是唯一的,这也就是所谓的首脑模式,从而保证我们的集群是高可用的。

2、统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算的特别多。

3、发布与订阅,类似消息队列MQ(amq,rmq),dubbo发布者把数据存在znode上,订阅者会读取这个数据。

4、提供分布式锁,分布式环境中不同进程之间争夺资源,类似多线程中的锁。

5、集群管理,集群中保证数据的强一致性。

7::Zookeeper-watcher机制

1、针对每个节点的操作,都会有一个监督者->wathcer。
2、当监控的某个对象(znode)发生了变化。则触发wathcer事件。
3、zk中的wathcer是一次性的,触发后立即销毁。
4、父节点、子节点 增删改查都能够触发其wathcer
5、针对不同类型的操作,触发的wathcer事件是不同的。子节点的创建事件,子节点的删除事件,子节点数据变化事件
6、ls 为父节点设置watcher,创建子节点触发:NodeChildChanged
7、ls 为父节点设置watcher,删除子节点触发:NodeChildChanged
8、ls 为父节点设置watcher,修改子节点不触发事件

get和stat的watch机制都是把当前节点作为父节点的

8:Apache Curator客户端的使用

需要依赖一下

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>

 

public class CuratorOperator {

public CuratorFramework client = null;
public static final String zkServerPath = "192.168.1.110:2181";

/**
* 实例化zk客户端
*/
public CuratorOperator() {
/**
* 同步创建zk示例,原生api是异步的
*
* curator链接zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

/**
* curator链接zookeeper的策略:RetryNTimes
* n:重试的次数
* sleepMsBetweenRetries:每次重试间隔的时间
*/
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

/**
* curator链接zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重试间隔的时间
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);

/**
* 永远重试,不推荐使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)

/**
* curator链接zookeeper的策略:RetryUntilElapsed
* maxElapsedTimeMs:最大重试时间
* sleepMsBetweenRetries:每次重试间隔
* 重试时间超过maxElapsedTimeMs后,就不再重试
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}

/**
*
* @Description: 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}

public static void main(String[] args) throws Exception {
// 实例化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

// 创建节点
String nodePath = "/super/imooc";
byte[] data = "superme".getBytes();
cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);

// 更新节点数据
// byte[] newData = "batman".getBytes();
// cto.client.setData().withVersion(0).forPath(nodePath, newData);

// 删除节点
// cto.client.delete()
// .guaranteed() // 如果删除失败,那么在后端还是继续会删除,直到成功
// .deletingChildrenIfNeeded() // 如果有子节点,就删除
// .withVersion(0)
// .forPath(nodePath);
不会删除workspace


// 读取节点数据
// Stat stat = new Stat();
// byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
// System.out.println("节点" + nodePath + "的数据为: " + new String(data));
// System.out.println("该节点的版本号为: " + stat.getVersion());


// 查询子节点
// List<String> childNodes = cto.client.getChildren()
// .forPath(nodePath);
// System.out.println("开始打印子节点:");
// for (String s : childNodes) {
// System.out.println(s);
// }
不会打印workspace

// 判断节点是否存在,如果不存在则为空
// Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
// System.out.println(statExist);


// watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
// cto.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
// cto.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

// 为节点添加watcher
// NodeCache: 监听数据节点的变更,会触发事件
// final NodeCache nodeCache = new NodeCache(cto.client, nodePath);
// // buildInitial : 初始化的时候获取node的值并且缓存
// nodeCache.start(true);
// if (nodeCache.getCurrentData() != null) {
// System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
// } else {
// System.out.println("节点初始化数据为空...");
// }
// nodeCache.getListenable().addListener(new NodeCacheListener() {
// public void nodeChanged() throws Exception {
// if (nodeCache.getCurrentData() == null) {
// System.out.println("空");
// return;
// }
// String data = new String(nodeCache.getCurrentData().getData());
// System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
// }
// });


// 为子节点添加watcher
// PathChildrenCache: 监听数据节点的增删改,会触发事件
String childNodePathCache = nodePath;
// cacheData: 设置缓存节点的数据状态
final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
String childData = new String(cd.getData());
System.out.println(childData);
}

childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化ok...");
}

else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals(ADD_PATH)) {
System.out.println("添加子节点:" + event.getData().getPath());
System.out.println("子节点数据:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("添加不正确...");
}

}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:" + event.getData().getPath());
System.out.println("修改子节点数据:" + new String(event.getData().getData()));
}
}
});

Thread.sleep(100000);

cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
}

public final static String ADD_PATH = "/super/imooc/d";

}
 
public class MyCuratorWatcher implements CuratorWatcher {

@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("触发watcher,节点路径为:" + event.getPath());
}

}
public class MyWatcher implements Watcher {

@Override
public void process(WatchedEvent event) {
System.out.println("触发watcher,节点路径为:" + event.getPath());
}
}

9:zk设置配置文件

public class Client1 {

public CuratorFramework client = null;
public static final String zkServerPath = "192.168.1.110:2181";

public Client1() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}

public void closeZKClient() {
if (client != null) {
this.client.close();
}
}

// public final static String CONFIG_NODE = "/super/imooc/redis-config";
public final static String CONFIG_NODE_PATH = "/super/imooc";
public final static String SUB_PATH = "/redis-config";
public static CountDownLatch countDown = new CountDownLatch(1);

public static void main(String[] args) throws Exception {
Client1 cto = new Client1();
System.out.println("client1 启动成功...");

final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

// 添加监听事件
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 监听节点变化
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
String configNodePath = event.getData().getPath();
if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);

// 读取节点数据
String jsonConfig = new String(event.getData().getData());
System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);

// 从json转换配置
RedisConfig redisConfig = null;
if (StringUtils.isNotBlank(jsonConfig)) {
redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
}

// 配置不为空则进行相应操作
if (redisConfig != null) {
String type = redisConfig.getType();
String url = redisConfig.getUrl();
String remark = redisConfig.getRemark();
// 判断事件
if (type.equals("add")) {
System.out.println("监听到新增的配置,准备下载...");
// ... 连接ftp服务器,根据url找到相应的配置
Thread.sleep(500);
System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功,已经添加到项目中");
// ... 拷贝文件到项目目录
} else if (type.equals("update")) {
System.out.println("监听到更新的配置,准备下载...");
// ... 连接ftp服务器,根据url找到相应的配置
Thread.sleep(500);
System.out.println("开始下载配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功...");
System.out.println("删除项目中原配置文件...");
Thread.sleep(100);
// ... 删除原文件
System.out.println("拷贝配置文件到项目目录...");
// ... 拷贝文件到项目目录
} else if (type.equals("delete")) {
System.out.println("监听到需要删除配置");
System.out.println("删除项目中原配置文件...");
}

// TODO 视情况统一重启服务
}
}
}
}
});

countDown.await();

cto.closeZKClient();
}

}

 

 

public class RedisConfig {

private String type; // add 新增配置 update 更新配置 delete 删除配置
private String url; // 如果是add或update,则提供下载地址
private String remark; // 备注

public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
}
RedisConfig.json相关代码
{"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
{"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
{"type":"delete","url":"","remark":"delete"}

10:zookeeper集群搭建

	zk集群, 主从节点, 心跳机制(选举模式)

 


	zookeeper集群搭建注意点:
	在dataDir创建并配置数据文件 myid 内容为1/2/3 对应 server.1/2/3
	通过 ./zkCli.sh -server [ip]:[port] 检测集群是否配置成功
	例:
		修改conf文件夹下的zoo.cfg文件加入:
			server.1=192.168.56.105:2888:3888
			server.2=192.168.56.105:2889:3889
			server.3=192.168.56.105:2890:3890
		再到dataDir里添加myid文件内容为1, 第二台集群则myid配置为2

11:ZooKeeper原生Java API的不足之处:

  • 在连接zk超时的时候,不支持自动重连,需要手动操作
  • Watch注册一次就会失效,需要反复注册
  • 不支持递归创建节点

Apache curator:

  • Apache 的开源项目
  • 解决Watch注册一次就会失效的问题
  • 提供的 API 更加简单易用
  • 提供更多解决方案并且实现简单,例如:分布式锁
  • 提供常用的ZooKeeper工具类
  • 编程风格更舒服,

 

12