欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

300行的动态配置中心客户端

程序员文章站 2024-03-20 22:45:04
...

300行的动态配置中心客户端

配置中心在目前service化中占着越来越重要的步骤,特别是在docker容器中。

在去年我们公司也需要一个这样的配置中心,借鉴了国内外如阿里百度配置中心。准备自己也写一个。

一开始设计的复杂的一米,可能需要一个月四五个人的开发才能完成。

但是突然的灵感,一下午就写完了,而且在线上运行了6个月80多台机器共同使用。没有一点问题。

不会挂的原因很简单,所有的配置都在zookeeper的node上,本地会加载到内存,然后写入到zookeeper的node上,zookeeper出现问题,和客户端一点问题都不相关。zookeeper的连接使用的是curator框架,并且设置3秒连接一次,无限重连。

白话文简单明了,各位线上出了问题请找我。300行的动态配置中心客户端

package com.jenkin.ec;


import net.sf.json.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


/**
 * Created by jenkin.z.chen on 2017/8/28.
 */
public class ZkConf extends Configuration {
    private static final Log logger = LogFactory.getLog(ZkConf.class);
    private static CuratorFramework client;
    private static Configuration configuration;
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private static String listenPath;
    private static ZkConf zkConf;
    private static String absolutePath;
    private static int port;
    /**
     * ZkAddress is zookeeper's address
     * listenPath is monitor the zookeeper path
     * Port is the main function is to distinguish the same directory machine machine
     * absolutePath is Program start read file, and writes it to zookeeper
     *
     * @param zkAddress
     * @param listenPath
     * @param absolutePath
     * @return
     */
    public static ZkConf getInstance(String zkAddress,String listenPath,int port,String absolutePath){
        if(zkConf==null){
            writeLock.lock();
            try{
                zkConf = new ZkConf(zkAddress,listenPath,port,absolutePath);
                return zkConf;
            }finally {
                writeLock.unlock();
            }
        }else{
            return zkConf;
        }
    }
    private ZkConf(String zkAddress,String listenerPath,int port,String absolutePath){
        if(StringUtils.isBlank(zkAddress)||StringUtils.isBlank(listenerPath)){
            throw new RuntimeException("[ZkError] zkAddress|listenerPath is null");
        }
        configuration = new Configuration();
        configuration.clear();
        this.listenPath = listenerPath+"/"+getLocalHost()+":"+port;
        this.absolutePath = absolutePath;
        this.port = port;
        client = CuratorFrameworkFactory.builder().connectString(zkAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .retryPolicy(new RetryNTimes(Integer.MAX_VALUE,3000))
                .build();
        client.start();
        init();
        Thread listenThread = new Thread(new ListenRunnable());
        listenThread.setDaemon(true);
        listenThread.start();
    }

    /**
     *
     */
    public void init(){
        File file = new File(absolutePath);
        if(!file.exists()){
            throw new RuntimeException("absolutePath not exist");
        }
        if(!file.canWrite()&&!file.canRead()){
            throw new RuntimeException("file can't read or can't write");
        }
        try {
            if(client.checkExists().forPath(listenPath)!=null&&client.getData().forPath(listenPath)!=null){
                writeConf(new String(client.getData().forPath(listenPath)));
            }else{
                parseXml();
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(listenPath,conf2Json().getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public String get(String name){
        readLock.lock();
        try{
            return configuration.get(name);
        }finally {
            readLock.unlock();
        }
    }
    public String get(String name,String defaultValue){
        readLock.lock();
        try{
            String valueString = configuration.get(name);
            if(valueString==null){
                return defaultValue;
            }
            return valueString;
        }finally {
            readLock.unlock();
        }
    }
    public String[] getStrings(String name){
        readLock.lock();
        try{
            String str = configuration.get(name);
            return str.split(",");
        }finally {
            readLock.unlock();
        }
    }
    public Collection<String> getStringCollection(String name){
        readLock.lock();
        try{
            return org.apache.hadoop.util.StringUtils.getStringCollection(configuration.get(name));
        }catch (Exception e){
            logger.error("ZkConf getStringCollection has error!!!",e);
        }
        finally {
            readLock.unlock();
        }
        return null;
    }
    public boolean containsKey(String name){
        readLock.lock();
        try{
            return configuration.onlyKeyExists(name);
        }finally {
            readLock.unlock();
        }
    }
    public int getInt(String name,int defaultValue){
        readLock.lock();
        try{
            String valueString = configuration.get(name);
            if(valueString==null){
                return defaultValue;
            }
            else{
                try{
                    int valueInt = Integer.valueOf(valueString);
                    return valueInt;
                }catch (Exception e){
                    return defaultValue;
                }
            }
        }finally {
            readLock.unlock();
        }
    }
    public long getLong(String name,long defaultValue){
        readLock.lock();
        try {
            String valueString = configuration.get(name);
            if(valueString==null){
                return defaultValue;
            }
            else{
                try{
                    long valueInt = Integer.valueOf(valueString);
                    return valueInt;
                }catch (Exception e){
                    return defaultValue;
                }
            }
        }finally {
            readLock.unlock();
        }
    }
    public boolean getBoolean(String name,boolean defaultValue){
        readLock.lock();
        try{
            String valueString = configuration.get(name);
            if(valueString==null){
                return defaultValue;
            }
            else{
                try{
                    boolean valueBoolean = Boolean.valueOf(valueString);
                    return valueBoolean;
                }catch(Exception e){
                    return defaultValue;
                }
            }
        }finally {
            readLock.unlock();
        }
    }
    public <F> Map<String,Map<String,F>> getMap(String name){
        readLock.lock();
        try{
            String valueString = configuration.get(name);
            if(valueString==null){
                return new HashMap<String,Map<String,F>>();
            }
            else{
                try{
                    ObjectMapper objectMapper = new ObjectMapper();
                    return objectMapper.readValue(valueString,Map.class);
                }catch(Exception e){
                    return new HashMap<String,Map<String,F>>();
                }
            }
        }finally {
            readLock.unlock();
        }
    }
    public void set(String key,String value){
        writeLock.lock();
        try{
            logger.info("[ZkConf] name:"+key+" has exchange,value:"+value);
            configuration.set(key,value);
        }finally {
            writeLock.unlock();
        }
    }
    public void clear(){
        writeLock.lock();
        try{
            logger.info("[ZkConf] Conf will to clear");
            configuration.clear();
        }finally {
            writeLock.unlock();
        }
    }
    private void writeConf(String data) throws IOException {
        writeLock.lock();
        try{
            configuration.clear();
            JSONObject jsonObject = JSONObject.fromObject(data);
            Iterator iterator = jsonObject.keys();
            while(iterator.hasNext()){
                String key = String.valueOf(iterator.next());
                set(key,jsonObject.getString(key));
            }
            configuration.writeXml(new FileOutputStream(new File(absolutePath)));
        }finally {
            writeLock.unlock();
        }

    }
    private String conf2Json(){
        readLock.lock();
        try{
            Iterator<Map.Entry<String, String>> iterator= configuration.iterator();
            JSONObject jsonObject = new JSONObject();
            while(iterator.hasNext()){
                Map.Entry<String,String> entry = iterator.next();
                jsonObject.put(entry.getKey(),entry.getValue());
            }
            return jsonObject.toString();
        }finally {
            readLock.unlock();
        }

    }
    public void listen() {
        try{
            final NodeCache loadBalance = new NodeCache(client,listenPath);
            loadBalance.getListenable().addListener(new NodeCacheListener() {
                public void nodeChanged() throws Exception {
                    if(loadBalance.getCurrentData().getData()!=null){
                        writeConf(new String(loadBalance.getCurrentData().getData()));
                    }
                    else{
                        clear();
                    }
                }
            });
            loadBalance.start();
        }catch (Exception e){
            logger.warn("[ZkError] start zookeeper listen has ERROR");
        }
    }
    public String getLocalHost()
    {
        InetAddress inetAddress = null;
        String host = null;
        try {
            inetAddress = InetAddress.getLocalHost();
            host = inetAddress.getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return host;
    }
    public Iterator<Map.Entry<String, String>> iterator(){
        return configuration.iterator();
    }

    private class ListenRunnable implements Runnable{

        public void run() {
            listen();
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void parseXml(){
        zkConf.addResource(new Path(absolutePath));
    }
}