ZooKeeper客户端框架Curator
程序员文章站
2022-07-02 09:30:50
...
- Curator介绍
引用
- Curator工具类
import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import com.zgjky.hec.common.constants.Constants; /** * 客户端连接zookeeper的工具类 通过Curator 客户端框架实现 * @author sheungxin * */ public class CuratorClient { private CuratorFramework curatorClient; private String namespace; private String connStr; private int retryInterval; private int maxRetries; private int connTimeoutMs; private int sessionTimeoutMs; public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries,int connTimeoutMs,int sessionTimeoutMs){ this.namespace=namespace; this.connStr=connStr; this.retryInterval=retryInterval; this.maxRetries=maxRetries; this.connTimeoutMs=connTimeoutMs; this.sessionTimeoutMs=sessionTimeoutMs; connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs); } public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries){ this.namespace=namespace; this.connStr=connStr; this.retryInterval=retryInterval; this.maxRetries=maxRetries; this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT; this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT; connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs); } public CuratorClient(String namespace,String connStr){ this.namespace=namespace; this.connStr=connStr; this.retryInterval=Constants.ZK_RETRY_INTERVAL; this.maxRetries=Constants.ZK_MAX_RETRIES; this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT; this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT; connect(namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs); } public CuratorClient(String connStr){ this.namespace=Constants.ZK_NAMESPACE; this.connStr=connStr; this.retryInterval=Constants.ZK_RETRY_INTERVAL; this.maxRetries=Constants.ZK_MAX_RETRIES; this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT; this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT; connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs); } /** * 创建到zookeeper的连接 * @param connectionString zookeeper服务器连接字符串 格式: ip/域名:port 127.0.0.1:2181 * @param retryInterval 重试间隔 * @param maxRetries 重试次数 * @param connectionTimeoutMs 连接timeout时间 * @param sessionTimeoutMs session 失效时间 * @return */ public void connect(String namespace,String connStr, int retryInterval,int maxRetries, int connTimeoutMs,int sessionTimeoutMs){ ACLProvider aclProvider=new ACLProvider() { private List<ACL> aclList; @Override public List<ACL> getDefaultAcl() { if(aclList==null){ aclList=ZooDefs.Ids.CREATOR_ALL_ACL; aclList.clear(); aclList.add(new ACL(Perms.ALL, new Id("auth",Constants.ZK_AUTH))); } return aclList; } @Override public List<ACL> getAclForPath(String arg0) { return aclList; } }; curatorClient = CuratorFrameworkFactory.builder() .aclProvider(aclProvider) .authorization("digest", Constants.ZK_AUTH.getBytes()) .namespace(namespace) .connectString(connStr) .retryPolicy(new ExponentialBackoffRetry(retryInterval, maxRetries)) .connectionTimeoutMs(connTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) .build(); start(); } /** * 添加连接状态监听器 * @param listener */ public void addConnListener(ConnectionStateListener listener){ curatorClient.getConnectionStateListenable().addListener(listener); } public void start(){ if(curatorClient.getState()!=CuratorFrameworkState.STARTED){ curatorClient.start(); } } public void disconnect(){ CloseableUtils.closeQuietly(curatorClient); } public CuratorFrameworkState getState(){ return curatorClient.getState(); } /** * 创建全局可见的持久化节点 * @param path 节点路径 * @param payload 节点数据 * @throws Exception */ public void createNodes(String path, byte[] payload) throws Exception{ curatorClient.create().creatingParentsIfNeeded().forPath(path, payload); } /** * 创建全局可见的持久化节点 * @param path 节点路径 * @throws Exception */ public void createNodes(String path) throws Exception{ curatorClient.create().creatingParentsIfNeeded().forPath(path); } /** * 创建全局可见的临时节点 * @param path 节点路径 * @param payload 节点数据 * @throws Exception */ public void createEphemeral(String path, byte[] payload) throws Exception{ curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, payload); } /** * 创建全局可见的临时节点 * @param path * @throws Exception */ public String createEphemeral(String path) throws Exception{ return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); } /** * 临时顺序节点 * @param path * @return * @throws Exception */ public String createEphemeralSeq(String path) throws Exception{ return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } /** * 写入节点数据 * @param path 节点路径 * @param payload 节点数据 * @throws Exception */ public void setData(String path, byte[] payload) throws Exception{ curatorClient.setData().forPath(path,payload); } /** * 获取节点数据 * @param path 节点路径 * @return * @throws Exception */ public String getData(String path) throws Exception{ String result=null; if(isNodeExists(path)){ result=new String(curatorClient.getData().forPath(path),Constants.ZK_CHARSET); } return result; } /** * 获取节点数据 * @param path 节点路径 * @return * @throws Exception */ public List<String> getChildData(String path) throws Exception{ List<String> resultList=null; if(isNodeExists(path)){ resultList=curatorClient.getChildren().forPath(path); } return resultList; } /** * 判断节点是否存在 * @param path * @return */ public boolean isNodeExists(String path){ boolean flag=false; try { flag=curatorClient.checkExists().forPath(path)!=null?true:false; } catch (Exception e) { e.printStackTrace(); } return flag; } /** * 删除指定节点 * @param path * @throws Exception */ public void delNode(String path) throws Exception{ curatorClient.delete().guaranteed().inBackground().forPath(path); } /** * 监控子节点的变化:添加、修改、删除 * @param path * @param childCacheListener * @return * @throws Exception */ public PathChildrenCache pathChildrenCache(String path,PathChildrenCacheListener childCacheListener) throws Exception { final PathChildrenCache childCache = new PathChildrenCache(curatorClient, path, true); childCache.getListenable().addListener(childCacheListener); return childCache; } /** * 监控节点的变化:增加、修改 * @param path * @param nodeCacheListener * @return */ public NodeCache nodeCache(String path,NodeCacheListener nodeCacheListener) { final NodeCache nodeCache = new NodeCache(curatorClient, path); nodeCache.getListenable().addListener(nodeCacheListener); return nodeCache; } /** * 监控节点及子节点的变化:增加、修改、删除 * @param path * @param treeCacheListener * @return */ public TreeCache nodeCache(String path,TreeCacheListener treeCacheListener) { final TreeCache treeCache = new TreeCache(curatorClient, path); treeCache.getListenable().addListener(treeCacheListener); return treeCache; } public void addCuratorListener(CuratorListener curatorListener){ curatorClient.getCuratorListenable().addListener(curatorListener); } }