Zookeeper实现服务动态ip注册和发现,并能在网络波动后重连(重新注册)
#encoding=utf8
#架构:后端服务 --> zookeeper -> 注册模型服务
import socket
import time
from kazoo import security
from kazoo.client import kazooclient
from kazoo.client import eventtype, watchedevent
from kazoo.exceptions import nonodeerror, sessionexpirederror, connectionlossexception, noautherror
import random
from functools import wraps
class exceptionhandler(object):
def __init__(self):
#kazootimeouterror
pass
def __call__(self, func):
@wraps(func)
def wrapped_function(*args, **kwargs):
is_success = false
while not is_success:
try:
is_success = func(*args, **kwargs)
except (connectionlossexception, sessionexpirederror) as e:
is_success = false
except noautherror as e:
raise e
except exception as e:
is_success = false
time.sleep(1)
return wrapped_function
class zkmodelwatcher(object):
def __init__(self, hosts, acl_user, acl_pass, model_name, model_port):
hosts_arr = hosts.split(',')
self.acl = security.make_digest_acl(username=acl_user, password=acl_pass, read=true, create=true)
self.auth_data = auth_data = [("digest", f"{acl_user}:{acl_pass}")]
self._a_host = hosts_arr[0].split(':')[0]
self._a_port = int(hosts_arr[0].split(':')[1])
self._hosts = hosts #zookeeper集群连接地址
self.model_name = model_name #注册模型服务名称,用于zookeeper根节点名称
self.model_port = model_port #注册模型服务端口号
self._zkc = kazooclient(hosts=self._hosts)
def get_host_ip(self):
'''
用于获取注册模型服务的ip地址(能够动态改变)
后端服务与zookeeper在同一网段中,注册模型服务的请求地址与获取服务的网段一致
'''
try:
s = socket.socket(socket.af_inet, socket.sock_dgram)
s.connect((self._a_host, self._a_port))
ip = s.getsockname()[0]
finally:
s.close()
return ip
def create_model_node(self):
#创建临时、序列化节点。
ip = self.get_host_ip()
res = self._zkc.create(f'/{self.model_name}/model', bytes('%s:%s' % (ip, self.model_port), encoding='utf8')
,makepath=true, sequence=true, ephemeral=true)
return res
def restart_zk(self):
try:
self._zkc.stop()
except:
pass
self._zkc = kazooclient(hosts=self._hosts, default_acl=[self.acl], auth_data=self.auth_data)
self._zkc.start()
@exceptionhandler()
def register(self, data, stat, event):
print(event)
if event and event.type == eventtype.none:
self.restart_zk()
res = self.create_model_node()
self._zkc.datawatch(path=f'{res}', func=self.register)
if not self._zkc.exists(path=f'{res}'):
return false
print(res)
elif event and event.type == eventtype.deleted:
res = self.create_model_node()
self._zkc.datawatch(path=f'{res}', func=self.register)
if not self._zkc.exists(path=f'{res}'):
return false
print(res)
return true
def get_model_host(self):
#后端服务通过zookeeper获取注册模型服务的地址,如果注册模型服务存在多个,则需要随机选择
name = self.model_name
is_success = false
while not is_success:
children = self._zkc.get_children(f'/{name}')
if len(children) <= 0:
raise exception('没有可以运行的服务')
try:
index = random.randint(0, len(children) - 1)
host = self._zkc.get(f'/{name}/{children[index]}')
is_success = true
return host[0].decode()
except nonodeerror as e:
is_success = false
# 选中的节点已经失效的情况
@exceptionhandler()
def run(self):
self.restart_zk()
res = self.create_model_node()
self._zkc.datawatch(path=f'{res}', func=self.register)
if not self._zkc.exists(path=f'{res}'):
return false
self.get_model_host()
return true
def close(self):
try:
self._zkc.stop()
self._zkc.close()
except exception as e:
print(str(e))
if __name__ == '__main__':
acl_username = 'user'
acl_password = 'pass'
zw = zkmodelwatcher('127.0.0.1:2181,127.0.0.1:12181,127.0.0.1:22181', acl_username, acl_password, 'model', 5000)
zw.run()
time.sleep(4000) #此处可以开启注册模型服务
zw.close()
参考资料:
1、zookeeper简介 - https://blog.51cto.com/hmtk520/2105110
2、使用kazoo连接zookeeper并监听节点数量以及值变化 - https://blog.csdn.net/pysense/article/details/100709138
3、python kazoo 监视zookeeper节点数据发生变化 - https://blog.csdn.net/kwsy2008/article/details/52042303?utm_medium=distribute.pc_relevant.none-task-blog-2%7edefault%7eblogcommendfrommachinelearnpai2%7edefault-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7edefault%7eblogcommendfrommachinelearnpai2%7edefault-1.control
4、关于zookeeper中session expired和watch - https://www.coder4.com/archives/3181
5、zk session客户端过期(expired)过程 - https://blog.csdn.net/lovingprince/article/details/6885746
6、zookeeper session expired - https://blog.csdn.net/specialsun/article/details/84812575
7、zookeeper curator处理会话过期session expired - https://www.cnblogs.com/kangoroo/p/7538314.html
8、python获取自身ip - https://www.cnblogs.com/hei-hei-hei/p/10489924.html
上一篇: Mybaits入门