欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python 基于python操纵zookeeper介绍

程序员文章站 2022-03-03 12:20:54
基于python操纵zookeeper介绍 by:授客 QQ:1033553122 测试环境 Win7 64位 Python 3.3.4 kazoo-2.6.1-py2.py3-none-any.whl(windows) kazoo-2.6.1.tar.gz (linux) https://pypi ......

基于python操纵zookeeper介绍

 

by:授客  qq:1033553122

测试环境

win7 64位

 

python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

 

zookeeper-3.4.13.tar.gz

下载地址:

 

 

 

 

代码实践

kazoostudy.py

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

import threading

import time

 

from kazoo.client import  kazooclient

from kazoo.client import kazoostate

from kazoo.retry import kazooretry

 

 

def restart_zk_client():

    '''重启zookeeper会话'''

    global zk_client

    global zk_conn_stat

    try:

        zk_client.restart()

    except exception as e:

        print('重启zookeeper客户端异常:%s' % e)

 

zk_conn_stat = 0 # zookeeper连接状态 1-lost   2-suspended 3-connected/reconnected

def zk_conn_listener(state):

    '''zookeeper连接状态监听器'''

 

    global  zk_conn_stat

    if state == kazoostate.lost:

        print('zookeeper connection lost')

        zk_conn_stat = 1

        # register somewhere that the session was lost

 

        thread = threading.thread(target=restart_zk_client)

        thread.start()

 

    elif state == kazoostate.suspended:

        print('zookeeper connection dicconnected')

        zk_conn_stat = 2

        # handle being disconnected from zookeeper

    else:

        zk_conn_stat = 3

        print('zookeeper connection cconnected/reconnected')

        # handle being connected/reconnected to zookeeper

 

# 监视器

# 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个watchedevent实例

def event_listener(event):

    print(event)

 

 

 

if __name__ == '__main__':

    try:

        # 建立连接

        zk_client = kazooclient(hosts='127.0.0.1:2181')

        zk_client.add_listener(zk_conn_listener) # 添加监听器,监听连接状态

        zk_client.start() # 初始化到zk的连接,可以设置超时时间 zk_client.start(timeout=15) 默认15秒

 

        print('zk_client state:', zk_client.state) # 查看链接状态

 

        # 创建节点

        # ensure_path() 递归创建path中不存在的节点,但是不能为节点设置数据,仅acl.

        zk_client.ensure_path('/node1')

 

        # 创建永久节点

        # create创建节点的同时,可为节点设置数据,要求path路径必须存在

        if not zk_client.exists('/node1/subnode1'):

            zk_client.create('/node1/subnode1', b'sub node1')

 

        # 创建临时节点

        # 注意:会话丢失、重启会话会导致zookeeper删除重启会话前创建的临时节点

        if not zk_client.exists('/node1/subnode2'):

            zk_client.create('/node1/subnode2', b'sub node2', ephemeral=true)

 

        # 创建有序临时节点

        zk_client.create('/node1/subnode', b'sub nodexxxx', ephemeral=true, sequence=true)

        # 读取数据

        # 判断节点是否存在

        if zk_client.exists('/node1'): # 如果返回值为none则表示不存在给定节点

            print('存在节点node1,节点路径/node1')

 

            # 获取节点相关数据

            data, stat = zk_client.get('/node1')

            if stat:

                print("version: %s, data: %s" % (stat.version, data.decode("utf-8")))

 

            # 获取给定节点的子节点

            children = zk_client.get_children('/node1')

            print('node1子节点 有 %s 子节点,节点名称为: %s' % (len(children), children))

 

            print('/ 子节点', zk_client.get_children('/'))

 

        # 更新节点

        # 更新节点数据

        zk_client.set("/node1/subnode2", b"some new data")

 

        # 删除节点 recursive参数可选,递归删除节点数据

        zk_client.delete("/node1", recursive=true)

 

 

        # 重试命令

        try:

            result = zk_client.retry(zk_client.get, "/node1/subnode3")

            print(result)

 

            # 自定义重试

            # max_tries 出错最大重试次数, ignore_expire false-重试的时候忽略会话过期,否则不忽略

            kr = kazooretry(max_tries=3, ignore_expire=false)

            result = kr(zk_client.get, "/node1/subnode3")

        except exception as e:

            print('/node1/subnode3 不存在,所以会运行出错')

 

 

        # 释放客户端占用资源,移除连接

        zk_client.stop()

 

        #  zk_client.stop() 会导致zk_client连接状态变成 lost,进而触发线程调用函数 restart_zk_client,

        # 该函数未执行完成的情况下,如果马上执行类似get,create等函数,会导致运行出错

        #

 

        while zk_conn_stat != 3:

            continue

        else:

            i = 0

            while i < 3000:

                if i % 200 == 0:

                    time.sleep(2)

                    print('创建新节点')

                    zk_client.ensure_path('/node1')

                    zk_client.ensure_path('/node1/subnode2')

                    zk_client.create('/node1/subnode', b'sub nodexxxx', ephemeral=true, sequence=true)

                    zk_client.set('/node1/subnode2', b'new data')

                i += 1

 

 

 

        # 关闭客户端前必须先调用stop,否则会报错

        zk_client.stop()

 

        # 关闭客户端

        zk_client.close()

    except exception as e:

        print('运行出错:%s' % e)

 

 

monitor.py

#!/usr/bin/env python

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

 

from kazoo.client import  kazooclient

from kazoo.client import kazoostate

 

zk = kazooclient(hosts='10.118.52.26:2181')

zk.start()

 

@zk.add_listener

def my_listener(state):

    if state == kazoostate.lost:

        print('lost')

        # register somewhere that the session was lost

    elif state == kazoostate.suspended:

        print('suspended')

        # handle being disconnected from zookeeper

    else:

        pass

        print('connected')

        # handle being connected/reconnected to zookeeper

 

# 监视器

# 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个watchedevent实例

def event_listener(event):

    print(event)

 

children = zk.get_children('/node1',watch=event_listener)

print('node1 has %s children with names %s' % (len(children), children))

 

# 更高级监视api

# 监视子节点的编号

@zk.childrenwatch('/node1')

def watch_children(children):

    print("children are now: %s" % children)

 

 

# 监视节点数据变更

@zk.datawatch("/node1/subnode2") #

def watch_node(data, state):

    """监视节点数据是否变化"""

    if state:

        print("version:", state.version, "data:", data)

 

# 空转

i = 0

while i< 100:

    # children = zk.get_children('/node1',watch=event_listener)

    # print('node1 has %s children with names %s' % (len(children), children))

    time.sleep(1)

 

zk.stop()

zk.close()

 

 

 

关于kazooclient连接状态说明

lost

connected

suspended

 

kazooclient客户端实例刚创建时,处于lost状态,同zookeeper建立连接后,转为connected 。如果连接出问题、切换到不同的zookeeper集群几点,转为suspended状态,当你知道暂时不能执行命令,如果zookeeper节点不再是集群的一部分,连接将丢失,也会导致 suspended状态

 

客户端再次同zookeeper建立连接,如果会话不存在,客户端连接状态将转为lost,如果会话没有过期,可用则转为connected

 

 

运行效果

Python 基于python操纵zookeeper介绍

Python 基于python操纵zookeeper介绍

 

参考链接: