欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ZooKeeper客户端框架Curator

程序员文章站 2022-07-02 09:48:37
...
  • Curator介绍
关于Curator不多介绍,网上很多,可以参考这篇:
引用

  • Curator工具类
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import com.zgjky.hec.common.constants.Constants;


/**
 * 客户端连接zookeeper的工具类 通过Curator 客户端框架实现
 * @author sheungxin
 *
 */
public class CuratorClient {
	private CuratorFramework curatorClient;
	private String namespace;
	private String connStr;
	private int retryInterval;
	private int maxRetries;
	private int connTimeoutMs;
	private int sessionTimeoutMs;
	
	public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries,int connTimeoutMs,int sessionTimeoutMs){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=retryInterval;
		this.maxRetries=maxRetries;
		this.connTimeoutMs=connTimeoutMs;
		this.sessionTimeoutMs=sessionTimeoutMs;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=retryInterval;
		this.maxRetries=maxRetries;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}

	public CuratorClient(String namespace,String connStr){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=Constants.ZK_RETRY_INTERVAL;
		this.maxRetries=Constants.ZK_MAX_RETRIES;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	public CuratorClient(String connStr){
		this.namespace=Constants.ZK_NAMESPACE;
		this.connStr=connStr;
		this.retryInterval=Constants.ZK_RETRY_INTERVAL;
		this.maxRetries=Constants.ZK_MAX_RETRIES;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	/**
	 * 创建到zookeeper的连接
	 * @param connectionString zookeeper服务器连接字符串 格式: ip/域名:port  127.0.0.1:2181
	 * @param retryInterval 重试间隔
	 * @param maxRetries 重试次数
	 * @param connectionTimeoutMs 连接timeout时间
	 * @param sessionTimeoutMs session 失效时间
	 * @return
	 */
    public void connect(String namespace,String connStr, int retryInterval,int maxRetries, int connTimeoutMs,int sessionTimeoutMs){
        ACLProvider aclProvider=new ACLProvider() {
			private List<ACL> aclList;
			
			@Override
			public List<ACL> getDefaultAcl() {
				if(aclList==null){
					aclList=ZooDefs.Ids.CREATOR_ALL_ACL;
					aclList.clear();
					aclList.add(new ACL(Perms.ALL, new Id("auth",Constants.ZK_AUTH)));
				}
				return aclList;
			}
			
			@Override
			public List<ACL> getAclForPath(String arg0) {
				return aclList;
			}
		};
    	curatorClient = CuratorFrameworkFactory.builder()
        	.aclProvider(aclProvider)
        	.authorization("digest", Constants.ZK_AUTH.getBytes())
        	.namespace(namespace)
            .connectString(connStr)
            .retryPolicy(new ExponentialBackoffRetry(retryInterval, maxRetries))
            .connectionTimeoutMs(connTimeoutMs)
            .sessionTimeoutMs(sessionTimeoutMs)
            .build();
    	start();
    }
    
    /**
     * 添加连接状态监听器
     * @param listener
     */
    public void addConnListener(ConnectionStateListener listener){
    	curatorClient.getConnectionStateListenable().addListener(listener);
    }
    
    public void start(){
    	if(curatorClient.getState()!=CuratorFrameworkState.STARTED){
    		curatorClient.start();
    	}
    }
    
    public void disconnect(){
    	CloseableUtils.closeQuietly(curatorClient);
    }
    
    public CuratorFrameworkState getState(){
    	return curatorClient.getState();
    }
    
    /**
     * 创建全局可见的持久化节点
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void createNodes(String path, byte[] payload) throws Exception{
    	curatorClient.create().creatingParentsIfNeeded().forPath(path, payload);
    }
    
    /**
     * 创建全局可见的持久化节点
     * @param path 节点路径
     * @throws Exception
     */
    public void createNodes(String path) throws Exception{
    	curatorClient.create().creatingParentsIfNeeded().forPath(path);
    }
    
    /**
     * 创建全局可见的临时节点
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void createEphemeral(String path, byte[] payload) throws Exception{    	 
    	curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
    }
    
    /**
     *  创建全局可见的临时节点
     * @param path
     * @throws Exception
     */
    public String createEphemeral(String path) throws Exception{    	 
    	return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
    }
    /**
     * 临时顺序节点
     * @param path
     * @return
     * @throws Exception
     */
    public String createEphemeralSeq(String path) throws Exception{    	 
    	return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    /**
     * 写入节点数据
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void setData(String path, byte[] payload) throws Exception{    	
    	curatorClient.setData().forPath(path,payload);
    }
    
    /**
     * 获取节点数据
     * @param path 节点路径
     * @return
     * @throws Exception
     */
    public String getData(String path) throws Exception{
    	String result=null;
    	if(isNodeExists(path)){
    		 result=new String(curatorClient.getData().forPath(path),Constants.ZK_CHARSET);
    	}
        return result;
    }  
    
    /**
     * 获取节点数据
     * @param path 节点路径
     * @return
     * @throws Exception
     */
    public List<String> getChildData(String path) throws Exception{
    	List<String> resultList=null;
    	if(isNodeExists(path)){
    		resultList=curatorClient.getChildren().forPath(path);
    	}
        return resultList;
    } 
    
    /**
     * 判断节点是否存在
     * @param path
     * @return
     */
    public boolean isNodeExists(String path){
    	boolean flag=false;
    	try {
    		flag=curatorClient.checkExists().forPath(path)!=null?true:false;
		} catch (Exception e) {
			e.printStackTrace();
		}
    	return flag;
    }
    
    /**
     * 删除指定节点
     * @param path
     * @throws Exception
     */
    public void delNode(String path) throws Exception{
    	curatorClient.delete().guaranteed().inBackground().forPath(path);
    }
    
    /**
     * 监控子节点的变化:添加、修改、删除
     * @param path
     * @param childCacheListener
     * @return 
     * @throws Exception
     */
    public PathChildrenCache pathChildrenCache(String path,PathChildrenCacheListener childCacheListener) throws Exception {
        final PathChildrenCache childCache = new PathChildrenCache(curatorClient, path, true);
        childCache.getListenable().addListener(childCacheListener);
        return childCache;
    }
    
    /**
     * 监控节点的变化:增加、修改
     * @param path
     * @param nodeCacheListener
     * @return
     */
    public NodeCache nodeCache(String path,NodeCacheListener nodeCacheListener) {
        final NodeCache nodeCache = new NodeCache(curatorClient, path);
        nodeCache.getListenable().addListener(nodeCacheListener);
        return nodeCache;
    }
 
    /**
     * 监控节点及子节点的变化:增加、修改、删除
     * @param path
     * @param treeCacheListener
     * @return
     */
    public TreeCache nodeCache(String path,TreeCacheListener treeCacheListener) {
        final TreeCache treeCache = new TreeCache(curatorClient, path);
        treeCache.getListenable().addListener(treeCacheListener);
        return treeCache;
    }
    
    public void addCuratorListener(CuratorListener curatorListener){
    	curatorClient.getCuratorListenable().addListener(curatorListener);
    }
    
}

相关标签: Zookeeper Curator