300行的动态配置中心客户端
程序员文章站
2024-03-20 22:45:04
...
300行的动态配置中心客户端
配置中心在目前service化中占着越来越重要的步骤,特别是在docker容器中。
在去年我们公司也需要一个这样的配置中心,借鉴了国内外如阿里百度配置中心。准备自己也写一个。
一开始设计的复杂的一米,可能需要一个月四五个人的开发才能完成。
但是突然的灵感,一下午就写完了,而且在线上运行了6个月80多台机器共同使用。没有一点问题。
不会挂的原因很简单,所有的配置都在zookeeper的node上,本地会加载到内存,然后写入到zookeeper的node上,zookeeper出现问题,和客户端一点问题都不相关。zookeeper的连接使用的是curator框架,并且设置3秒连接一次,无限重连。
白话文简单明了,各位线上出了问题请找我。
package com.jenkin.ec;
import net.sf.json.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Created by jenkin.z.chen on 2017/8/28.
*/
public class ZkConf extends Configuration {
private static final Log logger = LogFactory.getLog(ZkConf.class);
private static CuratorFramework client;
private static Configuration configuration;
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private static String listenPath;
private static ZkConf zkConf;
private static String absolutePath;
private static int port;
/**
* ZkAddress is zookeeper's address
* listenPath is monitor the zookeeper path
* Port is the main function is to distinguish the same directory machine machine
* absolutePath is Program start read file, and writes it to zookeeper
*
* @param zkAddress
* @param listenPath
* @param absolutePath
* @return
*/
public static ZkConf getInstance(String zkAddress,String listenPath,int port,String absolutePath){
if(zkConf==null){
writeLock.lock();
try{
zkConf = new ZkConf(zkAddress,listenPath,port,absolutePath);
return zkConf;
}finally {
writeLock.unlock();
}
}else{
return zkConf;
}
}
private ZkConf(String zkAddress,String listenerPath,int port,String absolutePath){
if(StringUtils.isBlank(zkAddress)||StringUtils.isBlank(listenerPath)){
throw new RuntimeException("[ZkError] zkAddress|listenerPath is null");
}
configuration = new Configuration();
configuration.clear();
this.listenPath = listenerPath+"/"+getLocalHost()+":"+port;
this.absolutePath = absolutePath;
this.port = port;
client = CuratorFrameworkFactory.builder().connectString(zkAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE,3000))
.build();
client.start();
init();
Thread listenThread = new Thread(new ListenRunnable());
listenThread.setDaemon(true);
listenThread.start();
}
/**
*
*/
public void init(){
File file = new File(absolutePath);
if(!file.exists()){
throw new RuntimeException("absolutePath not exist");
}
if(!file.canWrite()&&!file.canRead()){
throw new RuntimeException("file can't read or can't write");
}
try {
if(client.checkExists().forPath(listenPath)!=null&&client.getData().forPath(listenPath)!=null){
writeConf(new String(client.getData().forPath(listenPath)));
}else{
parseXml();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(listenPath,conf2Json().getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public String get(String name){
readLock.lock();
try{
return configuration.get(name);
}finally {
readLock.unlock();
}
}
public String get(String name,String defaultValue){
readLock.lock();
try{
String valueString = configuration.get(name);
if(valueString==null){
return defaultValue;
}
return valueString;
}finally {
readLock.unlock();
}
}
public String[] getStrings(String name){
readLock.lock();
try{
String str = configuration.get(name);
return str.split(",");
}finally {
readLock.unlock();
}
}
public Collection<String> getStringCollection(String name){
readLock.lock();
try{
return org.apache.hadoop.util.StringUtils.getStringCollection(configuration.get(name));
}catch (Exception e){
logger.error("ZkConf getStringCollection has error!!!",e);
}
finally {
readLock.unlock();
}
return null;
}
public boolean containsKey(String name){
readLock.lock();
try{
return configuration.onlyKeyExists(name);
}finally {
readLock.unlock();
}
}
public int getInt(String name,int defaultValue){
readLock.lock();
try{
String valueString = configuration.get(name);
if(valueString==null){
return defaultValue;
}
else{
try{
int valueInt = Integer.valueOf(valueString);
return valueInt;
}catch (Exception e){
return defaultValue;
}
}
}finally {
readLock.unlock();
}
}
public long getLong(String name,long defaultValue){
readLock.lock();
try {
String valueString = configuration.get(name);
if(valueString==null){
return defaultValue;
}
else{
try{
long valueInt = Integer.valueOf(valueString);
return valueInt;
}catch (Exception e){
return defaultValue;
}
}
}finally {
readLock.unlock();
}
}
public boolean getBoolean(String name,boolean defaultValue){
readLock.lock();
try{
String valueString = configuration.get(name);
if(valueString==null){
return defaultValue;
}
else{
try{
boolean valueBoolean = Boolean.valueOf(valueString);
return valueBoolean;
}catch(Exception e){
return defaultValue;
}
}
}finally {
readLock.unlock();
}
}
public <F> Map<String,Map<String,F>> getMap(String name){
readLock.lock();
try{
String valueString = configuration.get(name);
if(valueString==null){
return new HashMap<String,Map<String,F>>();
}
else{
try{
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(valueString,Map.class);
}catch(Exception e){
return new HashMap<String,Map<String,F>>();
}
}
}finally {
readLock.unlock();
}
}
public void set(String key,String value){
writeLock.lock();
try{
logger.info("[ZkConf] name:"+key+" has exchange,value:"+value);
configuration.set(key,value);
}finally {
writeLock.unlock();
}
}
public void clear(){
writeLock.lock();
try{
logger.info("[ZkConf] Conf will to clear");
configuration.clear();
}finally {
writeLock.unlock();
}
}
private void writeConf(String data) throws IOException {
writeLock.lock();
try{
configuration.clear();
JSONObject jsonObject = JSONObject.fromObject(data);
Iterator iterator = jsonObject.keys();
while(iterator.hasNext()){
String key = String.valueOf(iterator.next());
set(key,jsonObject.getString(key));
}
configuration.writeXml(new FileOutputStream(new File(absolutePath)));
}finally {
writeLock.unlock();
}
}
private String conf2Json(){
readLock.lock();
try{
Iterator<Map.Entry<String, String>> iterator= configuration.iterator();
JSONObject jsonObject = new JSONObject();
while(iterator.hasNext()){
Map.Entry<String,String> entry = iterator.next();
jsonObject.put(entry.getKey(),entry.getValue());
}
return jsonObject.toString();
}finally {
readLock.unlock();
}
}
public void listen() {
try{
final NodeCache loadBalance = new NodeCache(client,listenPath);
loadBalance.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if(loadBalance.getCurrentData().getData()!=null){
writeConf(new String(loadBalance.getCurrentData().getData()));
}
else{
clear();
}
}
});
loadBalance.start();
}catch (Exception e){
logger.warn("[ZkError] start zookeeper listen has ERROR");
}
}
public String getLocalHost()
{
InetAddress inetAddress = null;
String host = null;
try {
inetAddress = InetAddress.getLocalHost();
host = inetAddress.getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return host;
}
public Iterator<Map.Entry<String, String>> iterator(){
return configuration.iterator();
}
private class ListenRunnable implements Runnable{
public void run() {
listen();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void parseXml(){
zkConf.addResource(new Path(absolutePath));
}
}
上一篇: 路由过滤&路由引入实验练习
下一篇: iOS 库管理工具 CocoaPods
推荐阅读
-
SpringCloud分布式配置中心服务端与客户端
-
300行的动态配置中心客户端
-
SpringBoot2.0以上使用Spring Cloud Config配置中心【客户端刷新】(一)
-
Seata客户端使用配置中心
-
SpingCloud(H版&alibaba)框架开发教程-37 配置中心之客户端手动动态刷新
-
(11)springcloud配置中心--客户端
-
微服务搭建Spring Cloud配置中心【客户端】
-
SpringCloud学习-part38 配置中心之客户端与动态刷新
-
Websphere MQ Java/JMS 客户端的 SSL/AMS 配置 MQJMSIBMSSLJava
-
Consul-template的简单应用:配置中心,服务发现与健康监测 博客分类: Consul-templateConsul consulconsul template架构集群