Kafka与.net core(二)zookeeper
程序员文章站
2022-07-02 17:00:42
1.zookeeper简单介绍 1.1作用 zookeeper的作用是存储kafka的服务器信息,topic信息,和cunsumer信息。如下图: 而zookeeper是个什么东西呢?简单来说就是一个具有通知机制的文件系统,引用网路上的一张图 可以看出来zookeeper是一个树形的文件结构,我们可 ......
1.zookeeper简单介绍
1.1作用
zookeeper的作用是存储kafka的服务器信息,topic信息,和cunsumer信息。如下图:
而zookeeper是个什么东西呢?简单来说就是一个具有通知机制的文件系统,引用网路上的一张图
可以看出来zookeeper是一个树形的文件结构,我们可以自定义node与node的值,并对node进行监视,当node的结构或者值变化时,我们可以收到通知。
1.2node类型
1)persistent-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
2)persistent_sequential-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是zookeeper给该节点名称进行顺序编号
3)ephemeral-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
4)ephemeral_sequential-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是zookeeper给该节点名称进行顺序编号
2.zookeeper命令操作
连接zookeeper
[root@iz2zei2y693gtrgwlibzlwz ~]# zkcli.sh -server ip:2181
查看zookeeper的所有节点
ls /
查看某个节点的子节点
ls /brokers
创建节点
create /testaa dataaaa
获取节点的值
get /testaa
设置节点值
set /testaa aaabbb
删除节点
delete /testaa
这么看来实际上zookeeper跟数据库类似也是curd操作,我们再来看看zookeeper的安全控制acl
3.zookeeper的acl
3.1zk的节点有5种操作权限:
create、read、write、delete、admin 也就是 增、删、改、查、管理权限,这5种权限简写为crwda(即:每个单词的首字符缩写)
3.2身份的认证有4种方式:
world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
ip:使用ip地址认证
3.3acl实例
默认创建的时world方式,任何人都能访问,我们新建一个测试node节点
[zk: 3:2181(connected) 11] create /cys cys
访问一一下
[zk: 3:2181(connected) 13] get /cys
查看一下acl
[zk: 3:2181(connected) 15] getacl /cys
下面我们设置一下他的用户
命令为:
1)增加一个认证用户
addauth digest 用户名:密码明文
eg. addauth digest user1:password1
2)设置权限
setacl /path auth:用户名:密码明文:权限
eg. setacl /test auth:user1:password1:cdrwa
3)查看acl设置
getacl /path
具体操作如下:
addauth digest cys:123456 setacl /cys auth:cys:123456:crwda
我们ctrl+c退出zkcli,重新连接一下,然后查询
get /cys
结果如下:
提示我们认证失败,我们登陆一下
addauth digest cys:123456
结果如下:
我们在查看一下/cys节点的acl
可以看出来用户cys对应的密码(加密后的)和权限cdrwa
4..net core 操作
4.1新建server项目,引入zookeepernetex这个nuget包
server端代码
using org.apache.zookeeper; using org.apache.zookeeper.data; using system; using system.collections.generic; using system.text; using system.threading.tasks; namespace consoleapp3 { class program { static void main(string[] args) { while (true) { console.writeline("输入path"); var path = console.readline(); string address = "39.**.**.**:2181"; zookeeper _zookeeper = new zookeeper(address, 1000 * 1000, null); zookeeper.states states = _zookeeper.getstate(); //是否存在 task<org.apache.zookeeper.data.stat> stat = _zookeeper.existsasync(path); stat.wait(); if (stat.result != null && stat.status.tostring().tolower() == "rantocompletion".tolower()) { //已存在 console.writeline($"{path}已存在"); } else { console.writeline("输入data"); var data = console.readline(); //创建 task<string> task = _zookeeper.createasync(path, system.text.encoding.utf8.getbytes(data), zoodefs.ids.open_acl_unsafe, createmode.persistent); task.wait(); if (!string.isnullorempty(task.result) && task.status.tostring().tolower() == "rantocompletion".tolower()) { console.writeline($"{path}创建成功"); } } console.writeline("输入set data"); var dataa = console.readline(); //set值 task<org.apache.zookeeper.data.stat> stata = _zookeeper.setdataasync(path, system.text.encoding.utf8.getbytes(dataa)); stata.wait(); if (stata.result != null && stata.status.tostring().tolower() == "rantocompletion".tolower()) { console.writeline("set 成功"); } console.writeline("输入子path"); var childpath = console.readline(); console.writeline("输入子data"); var childdata = console.readline(); task<string> childtask = _zookeeper.createasync(childpath, system.text.encoding.utf8.getbytes(childdata), zoodefs.ids.open_acl_unsafe, createmode.persistent); childtask.wait(); if (!string.isnullorempty(childtask.result) && childtask.status.tostring().tolower() == "rantocompletion".tolower()) { console.writeline($"{childpath}创建成功"); } console.readline(); _zookeeper.closeasync().wait(); } ////删除 //console.writeline("输入delete path"); //var pathb = console.readline(); //task taska = _zookeeper.deleteasync(pathb); //taska.wait(); //if (taska.status.tostring().tolower() == "rantocompletion".tolower()) //{ // console.writeline("delete 成功"); //} ////获取数据 //task<dataresult> dataresult = _zookeeper.getdataasync(path, new nodewatcher()); //dataresult.wait(); //if (dataresult.result != null && dataresult.status.tostring().tolower() == "rantocompletion".tolower()) //{ // console.writeline(encoding.utf8.getstring(dataresult.result.data)); //} } } }
4.2新建client项目,引入zookeepernetex这个nuget包
客户端代码
using org.apache.zookeeper; using org.apache.zookeeper.data; using system; using system.collections.generic; using system.threading; using system.threading.tasks; namespace client { class program { static void main(string[] args) { console.writeline("输入path"); var path = console.readline(); string address = "39.**.**.**:2181"; zookeeper _zookeeper = new zookeeper(address, 10 * 1000, new defaultwatcher());
//用户登陆 _zookeeper.addauthinfo("digest", system.text.encoding.default.getbytes("cys:123456")); //获取child var getresult = _zookeeper.getchildrenasync(path, true); getresult.wait(); //获取数据 task<dataresult> dataresult = _zookeeper.getdataasync(path,true); dataresult.wait(); thread.sleep(100000); } } public class defaultwatcher : watcher { internal static readonly task completedtask = task.fromresult(1); /// <summary> /// 接收通知 /// </summary> /// <param name="event"></param> /// <returns></returns> public override task process(watchedevent @event) { console.writeline(string.format("接收到zookeeper服务端的通知,state是:{0},eventtype是:{1},path是:{2}", @event.getstate(), @event.get_type(), @event.getpath() ?? string.empty)); return completedtask; } } }
这样当server端操作的时候,client端会通过watcher收到通知
上一篇: 关于缓动动画函数的封装
下一篇: Java相关资料分享(视频+电子书籍)
推荐阅读
-
Z从壹开始前后端分离【 .NET Core2.0/3.0 +Vue2.0 】框架之二 || 后端项目搭建
-
.Net core_Excel 导出二维码(以导出箱单为例)
-
abp(net core)+easyui+efcore实现仓储管理系统——ABP WebAPI与EasyUI结合增删改查之一(二十七)
-
.net core 反射的介绍与使用
-
详解ASP.NET与ASP.NET Core用户验证Cookie并存解决方案
-
.net core i上 K8S(二)运行简单.netcore程序
-
详解.Net Core 权限验证与授权(AuthorizeFilter、ActionFilterAttribute)
-
ASP.NET Core 中的 ObjectPool 对象重用(二)
-
树莓派(4B)Linux + .Net Core嵌入式-HelloWorld(二)
-
Asp.net Core MVC中怎么把二级域名绑定到特定的控制器上