基于zookeeper实现分布式配置中心(二)
上一篇(基于zookeeper实现分布式配置中心(一))讲述了zookeeper相关概念和工作原理。接下来根据zookeeper的特性,简单实现一个分布式配置中心。
配置中心的优势
1、各环境配置集中管理。
2、配置更改,实时推送,jvm环境变量及时生效。
3、依靠配置变更,动态扩展功能,减少二次上线带来的成本。
4、减少开发人员、运维人员修改配置带来的额外开销。
配置中心架构图
配置中心功能
1、配置管理平台中,操作人员可以创建项目所属系统、应用名称、实例名称、配置分组等信息。
2、配置管理平台中,操作人员可以上传配置文件,对属性有增、删、改、查的操作。
3、配置内容通过配置管理平台后台服务进行持久化(保存到数据库中)。
4、操作人员通过配置平台进行推送操作,将配置推送到zk集群相应结点(/cfgcenter/系统名称/应用名称/实例名称/分组名称)。
5、配置中心客户端监听zk集群中对应结点数据发生变化,读取变更后的内容,解析内容,刷新本地备份(分布式容灾)和Spring环境变量。
6、配置中心客户端如果和zk集群丢失连接,将加载本地本分配置到Spring环境变量。
7、配置中心客户端重新和zk集群建立连接,从zk集群中拉取最新配置内容,解析配置,刷新本地备份(分布式容灾)和Spring环境变量。
8、配置中心客户端将Spring环境变量刷新之后,动态刷新依赖配置中心配置的bean。
配置中心代码视图
配置中心客户端设计解析
配置中心客户端初始化
@Component public class CfgcenterInit implements ApplicationContextInitializer<ConfigurableWebApplicationContext>, ApplicationListener<ApplicationEvent> { private static Logger LOGGER = LoggerFactory.getLogger(CfgcenterInit.class); @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextRefreshedEvent) { LOGGER.info("初始化配置中心客户端监听器..."); ZKClient.getInstance() .init(); } else if (event instanceof RefreshEvent) { ZKClient.getInstance() .getAeb() .post(event); } else if (event instanceof ContextClosedEvent) { if (null != ZKClient.getInstance().getCw()) { ZKClient.getInstance() .getCw() .close(); } } } @Override public void initialize(ConfigurableWebApplicationContext cac) { try { ZookeeperProperties zookeeperProperties = ConfigurationBinder .withPropertySources(cac.getEnvironment()) .bind(ZookeeperProperties.class); if (!zookeeperProperties.isEnabled()) { LOGGER.info("未开启配置中心客戶端..."); return; } ZKClient.getInstance() .binding( zookeeperProperties , new ZookeeperConfigProperties() , cac ); } catch (Exception e) { LOGGER.error("配置中心客户端初始化异常...", e); } } }
1、ApplicationContextInitializer#initialize方法中,获取zk连接信息配置,如果开启配置中心客户端,将ZookeeperProperties(zk集群连接信息)、ZookeeperConfigProperties(客户端监听zk集群结点信息)、ConfigurableWebApplicationContext (应用上下文)绑定到ZKClient实例中去。
2、ApplicationListener#onApplicationEvent方法中监听ContextRefreshedEvent(初始化配置中心客户端监听器)、RefreshEvent(配置刷新事件,通过guava的事件总线进行推送)、ContextClosedEvent(关闭配置中心客户端资源)。
配置中心客户端监听器
public class ConfigWatcher implements Closeable, TreeCacheListener { private static Logger LOGGER = LoggerFactory.getLogger(ConfigWatcher.class); private AtomicBoolean running = new AtomicBoolean(false); private String context; private CuratorFramework source; private HashMap<String, TreeCache> caches; public ConfigWatcher(String context, CuratorFramework source) { this.context = context; this.source = source; } public void start() { if (this.running.compareAndSet(false, true)) { this.caches = new HashMap<>(); if (!context.startsWith("/")) { context = "/" + context; } try { TreeCache cache = TreeCache.newBuilder(this.source, context).build(); cache.getListenable().addListener(this); cache.start(); this.caches.put(context, cache); // no race condition since ZookeeperAutoConfiguration.curatorFramework // calls curator.blockUntilConnected } catch (KeeperException.NoNodeException e) { // no node, ignore } catch (Exception e) { LOGGER.error("Error initializing listener for context " + context, e); } } } @Override public void close() { if (this.running.compareAndSet(true, false)) { for (TreeCache cache : this.caches.values()) { cache.close(); } this.caches = null; } } @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) { TreeCacheEvent.Type eventType = event.getType(); switch (eventType) { case INITIALIZED: LOGGER.info("配置中心客户端同步服务端状态完成..."); refreshEnvAndBeans(event); break; case NODE_REMOVED: case NODE_UPDATED: refreshEnvAndBeans(event); break; case CONNECTION_SUSPENDED: case CONNECTION_LOST: LOGGER.info("配置中心客户端与服务端连接异常..."); break; case CONNECTION_RECONNECTED: LOGGER.info("配置中心客户端与服务端重新建立连接..."); break; } } private void refreshEnvAndBeans(TreeCacheEvent event) { //刷新环境变量 ZKClient.getInstance() .refreshEnvironment(); //刷新Bean ZKClient.getInstance() .getAep() .publishEvent( new RefreshEvent(this, event, getEventDesc(event)) ); } private String getEventDesc(TreeCacheEvent event) { StringBuilder out = new StringBuilder(); out.append("type=").append(event.getType()); TreeCacheEvent.Type eventType = event.getType(); if (eventType == NODE_UPDATED || eventType == NODE_REMOVED) { out.append(", path=").append(event.getData().getPath()); byte[] data = event.getData().getData(); if (data != null) { out.append(", data=").append(new String(data, StandardCharsets.UTF_8)); } } return out.toString(); } }
1、通过TreeCache监听路径/cfgcenter/系统名称/应用名称/实例名称/分组名称(该路径下可能会存在多个子节点,每个子节点对应一份配置,每一份配置大小不能超过64k)。
2、TreeCache监听事件类型如下
- INITIALIZED(完成同步服务端状态,同步状态【NODE_REMOVED、 NODE_UPDATED、CONNECTION_RECONNECTED】之后触发)
- NODE_REMOVED(结点移除触发)
- NODE_UPDATED(结点数据更新触发)
- CONNECTION_SUSPENDED(连接丢失触发)
- CONNECTION_LOST(完全丢失连接触发)
- CONNECTION_RECONNECTED(重新连接触发)
3、监听到INITIALIZED、NODE_UPDATED、NODE_REMOVED事件之后,执行refreshEnvAndBeans方法,刷新spring环境变量,同时刷新spring容器相关的Bean。
配置中心客户端刷新spring环境变量
public class ZookeeperPropertySourceLocator { public static final String ZOOKEEPER_PREPERTY_SOURCE_NAME = "cfg-zookeeper"; private ZookeeperConfigProperties properties; private CuratorFramework curator; private static Logger LOGGER = LoggerFactory.getLogger(ZookeeperPropertySourceLocator.class); public ZookeeperPropertySourceLocator(CuratorFramework curator, ZookeeperConfigProperties properties) { this.curator = curator; this.properties = properties; } public String getContext() { return this.properties.getContext(); } public PropertySource getCfgcenterPropertySource(Environment environment) { ConfigurableEnvironment env = (ConfigurableEnvironment) environment; return env.getPropertySources().get(ZOOKEEPER_PREPERTY_SOURCE_NAME); } public void locate(Environment environment) { if (environment instanceof ConfigurableEnvironment) { ConfigurableEnvironment env = (ConfigurableEnvironment) environment; String context = properties.getContext(); CompositePropertySource composite = new CompositePropertySource(ZOOKEEPER_PREPERTY_SOURCE_NAME); try { PropertySource propertySource = create(context); composite.addPropertySource(propertySource); if (null != env.getPropertySources().get(ZOOKEEPER_PREPERTY_SOURCE_NAME)) { LOGGER.info("替换PropertySource: " + ZOOKEEPER_PREPERTY_SOURCE_NAME); env.getPropertySources().replace(ZOOKEEPER_PREPERTY_SOURCE_NAME, composite); } else { LOGGER.info("添加PropertySource: " + ZOOKEEPER_PREPERTY_SOURCE_NAME); env.getPropertySources().addFirst(composite); } } catch (Exception e) { if (this.properties.isFailFast()) { ReflectionUtils.rethrowRuntimeException(e); } else { LOGGER.error("Unable to load zookeeper config from " + context, e); } } } } @PreDestroy public void destroy() { } private void backupZookeeperPropertySource(ZookeeperPropertySource zps) { String backupDir = BASE_BACKUP_DIR + this.properties.getContext(); String backupFile = String.format("%s/%s", backupDir, APP_NAME + ".properties"); File bakFile = new File(backupFile); StringBuilder data = new StringBuilder(); for (String propertyName : zps.getPropertyNames()) { data.append(propertyName) .append("=") .append(zps.getProperty(propertyName)) .append(System.lineSeparator()); } try { FileUtils.writeStringToFile(bakFile, data.toString(), Charsets.UTF_8); LOGGER.info("配置中心客户端刷新本地备份完成, path: " + backupDir); } catch (IOException e) { LOGGER.error("配置中心客户端刷新本地备份异常..., path: " + backupDir, e); } } private PropertySource<CuratorFramework> create(String context) { ZookeeperPropertySource zps; if (ZKClient.getInstance().isConnected()) { zps = new ZookeeperPropertySource(context, this.curator, false); this.backupZookeeperPropertySource(zps); } else { zps = new ZookeeperPropertySource(context, this.curator, true); } return zps; } }
ZookeeperPropertySourceLocator会创建ZookeeperPropertySource,然后放入Spring的Environment变量中。如果配置中心客户端和zk集群处于连接状态,加载完ZookeeperPropertySource之后,备份到本地。
public class ZookeeperPropertySource extends AbstractZookeeperPropertySource { private static Logger LOGGER = LoggerFactory.getLogger(ZookeeperPropertySource.class); private Map<String, String> properties = new LinkedHashMap<>(); public ZookeeperPropertySource(String context, CuratorFramework source, boolean backup) { super(context, source); //加载本地配置 if (backup) { String backupDir = BASE_BACKUP_DIR + this.getContext(); String backupFile = String.format("%s/%s", backupDir, APP_NAME + ".properties"); try { InputStream is = FileUtils.openInputStream(new File(backupFile)); InputStreamReader isr = new InputStreamReader(is); Properties properties = new Properties(); properties.load(isr); properties.forEach((k, v) -> this.properties.put((String) k, (String) v)); } catch (Exception e) { LOGGER.error("配置中心客户端本地配置加载异常...", e); } } //加载远程配置 else { findProperties(this.getContext(), null); } } @Override public Object getProperty(String name) { return this.properties.get(name); } private byte[] getPropertyBytes(String fullPath) { try { byte[] bytes = null; try { bytes = this.getSource().getData().forPath(fullPath); } catch (KeeperException e) { if (e.code() != KeeperException.Code.NONODE) { throw e; } } return bytes; } catch (Exception exception) { ReflectionUtils.rethrowRuntimeException(exception); } return null; } @Override public String[] getPropertyNames() { Set<String> strings = this.properties.keySet(); return strings.toArray(new String[strings.size()]); } private void findProperties(String path, List<String> children) { try { LOGGER.info("entering findProperties for path: " + path); if (children == null) { children = getChildren(path); } if (children == null || children.isEmpty()) { return; } for (String child : children) { String childPath = path + "/" + child; List<String> childPathChildren = getChildren(childPath); byte[] bytes = getPropertyBytes(childPath); if (!ArrayUtils.isEmpty(bytes)) { registerKeyValue(childPath, new String(bytes, Charset.forName("UTF-8"))); } // Check children even if we have found a value for the current znode findProperties(childPath, childPathChildren); } LOGGER.info("leaving findProperties for path: " + path); } catch (Exception exception) { ReflectionUtils.rethrowRuntimeException(exception); } } private void registerKeyValue(String path, String value) { String key = sanitizeKey(path); LOGGER.info(String.format("配置中心客户端解析配置节点(%s),数据:%s", key, value)); try { Properties properties = new Properties(); properties.load(new StringReader(value)); properties.forEach((k, v) -> this.properties.put((String) k, (String) v)); } catch (IOException e) { LOGGER.info(String.format("配置中心客户端解析配置节点(%s)异常...", key)); } } private List<String> getChildren(String path) throws Exception { List<String> children = null; try { children = this.getSource().getChildren().forPath(path); } catch (KeeperException e) { if (e.code() != KeeperException.Code.NONODE) { throw e; } } return children; } }
ZookeeperPropertySource通过构造参数backup来判断是加载zk集群中的配置还是本地备份配置。
配置中心客户端刷新Spring容器Bean
public abstract class BaseCfgcenterBean implements InitializingBean { private static Logger LOGGER = LoggerFactory.getLogger(BaseCfgcenterBean.class); @PostConstruct public void init() { //注册到时间总线中 ZKClient.getInstance() .getAeb() .register(this); } /** * z * 绑定自身目标 **/ protected void doBind() { Class<? extends BaseCfgcenterBean> clazz = this.getClass(); if (org.springframework.util.ClassUtils.isCglibProxy(this)) { clazz = (Class<? extends BaseCfgcenterBean>) AopUtils.getTargetClass(this); } BaseCfgcenterBean target = binding(clazz, this.getDefaultResourcePath()); this.copyProperties(target); } private void copyProperties(BaseCfgcenterBean target) { ReflectionUtils.doWithFields(this.getClass(), field -> { field.setAccessible(true); field.set(this, field.get(target)); }, field -> AnnotatedElementUtils.isAnnotated(field, ConfigField.class)); } /** * 绑定其他目标 * * @param clazz 目标类 **/ protected <T> T doBind(Class<T> clazz) { T target = binding(clazz, this.getDefaultResourcePath()); if (target instanceof InitializingBean) { try { ((InitializingBean) target).afterPropertiesSet(); } catch (Exception e) { LOGGER.error(String.format("属性初始化失败[afterPropertiesSet], class=%s", ClassUtils.getSimpleName(clazz), e)); } } return target; } private <T> T binding(Class<T> clazz, String defaultResourcePath) { Optional<PropertySource> propertySource = Optional.empty(); //加载配置中心配置 if (ZKClient.getInstance().isZkInit()) { propertySource = Optional.ofNullable( ZKClient.getInstance() .resolvePropertySource() ); } //加载本地配置 else { Optional<ResourcePropertySource> resourcePropertySource = ResourceUtils.getResourcePropertySource(defaultResourcePath); if (resourcePropertySource.isPresent()) { propertySource = Optional.ofNullable(resourcePropertySource.get()); } } if (propertySource.isPresent()) { T target; try { target = ConfigurationBinder .withPropertySources(propertySource.get()) .bind(clazz); } catch (Exception e) { LOGGER.error(String.format("属性绑定失败, class=%s", ClassUtils.getSimpleName(clazz)), e); return null; } return target; } return null; } @Override public void afterPropertiesSet() { Class<?> target = this.getClass(); if (AopUtils.isAopProxy(this)) { target = AopUtils.getTargetClass(this); } LOGGER.info(String.format("%s->%s模块引入配置中心%s..." , this.getModuleName() , ClassUtils.getSimpleName(target) , (ZKClient.getInstance() .isConnected() ? "生效" : "无效") )); } public String getModuleName() { return StringUtils.EMPTY; } @Subscribe public void listenRefreshEvent(RefreshEvent refreshEvent) { this.afterPropertiesSet(); LOGGER.info(refreshEvent.getEventDesc()); this.refresh(); } //通过事件进行刷新 protected void refresh() { this.doBind(); } //获取本地配置默认路径 protected abstract String getDefaultResourcePath(); }
1、对象自身实现guava事件总线监听,监听RefreshEvent事件,触发对象属性刷新操作。
2、对象初始化时,注册自身目标到guava的事件总线对象中。
3、对象属性刷新,获取到PropertySource对象(配置中心配置或者项目自身静态配置),通过ConfigurationBinder工具类将配置重新绑定的对象属性。
配置管理平台接口
@RestController @RequestMapping("cfg") public class CfgController { private static Logger LOGGER = LoggerFactory.getLogger(CfgController.class); private static final String ZK_PATH_PATTERN0 = "/wmhcfg/projects/%s/%s"; private static final String ZK_PATH_PATTERN1 = ZK_PATH_PATTERN0 + "/%s"; private static final String ZK_PATH_PATTERN = ZK_PATH_PATTERN1 + "/%s"; @Autowired private CfgMapper mapper; @GetMapping(value = "/search", produces = MediaType.TEXT_PLAIN_VALUE) public String findCfgContents(@RequestBody @Validated SearchVO searchVO , @RequestParam(required = false) String cfgId) { List<CfgRecord> records = mapper.findRecords(searchVO); if (CollectionUtils.isEmpty(records)) { return StringUtils.EMPTY; } if (StringUtils.isNotBlank(cfgId)) { records = records.stream().filter(record -> cfgId.equals(record.getCfgId())).collect(Collectors.toList()); } StringBuilder response = new StringBuilder(); Properties properties = new Properties(); records.forEach(record -> { try { properties.clear(); properties.load(new StringReader(record.getCfgContent())); properties.forEach((key, value) -> response.append(key) .append("=") .append(value) .append(System.lineSeparator()) .append(System.lineSeparator()) ); } catch (IOException e) { LOGGER.error("配置解析异常...", e); } }); return response.toString(); } @PostMapping(value = "/send/{systemId}/{appId}/{groupId}/{cfgId}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public BaseResponse sendCfgContent(@RequestBody String cfgContent , @PathVariable String systemId , @PathVariable String appId , @PathVariable String groupId , @PathVariable String cfgId) { BaseResponse baseResponse = new BaseResponse(); baseResponse.setRestStatus(RestStatus.SUCCESS); SearchVO searchVO = new SearchVO(); searchVO.setSystemId(systemId); searchVO.setAppId(appId); searchVO.setGroupId(groupId); List<CfgRecord> records = mapper.findRecords(searchVO); CfgRecord record = null; if (!CollectionUtils.isEmpty(records)) { for (CfgRecord cfgRecord : records) { if (cfgId.equals(cfgRecord.getCfgId())) { record = cfgRecord; record.setCfgContent(cfgContent); break; } } } if (null == record) { record = new CfgRecord(); record.setSystemId(systemId); record.setAppId(appId); record.setGroupId(groupId); record.setCfgId(cfgId); record.setCfgContent(cfgContent); } StringBuilder cfgContentSB = new StringBuilder(); Properties properties = new Properties(); try { properties.load(new StringReader(record.getCfgContent())); } catch (IOException e) { LOGGER.error("配置解析异常...", e); baseResponse.setErrors(e.getMessage()); baseResponse.setRestStatus(RestStatus.FAIL_50001); return baseResponse; } properties.forEach((key, value) -> cfgContentSB.append(key) .append("=") .append(value) .append(System.lineSeparator()) ); record.setCfgContent(cfgContentSB.toString()); if (null == record.getId()) { mapper.insertRecord(record); } else { mapper.updateRecord(record); } return baseResponse; } @PostMapping(value = "/push", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public BaseResponse pushCfgContent(@RequestBody @Validated PushVO pushVO) { BaseResponse baseResponse = new BaseResponse(); baseResponse.setRestStatus(RestStatus.SUCCESS); String path = String.format(ZK_PATH_PATTERN , pushVO.getSystemId() , pushVO.getAppId() , pushVO.getGroupId() , pushVO.getCfgId() ); try { SearchVO searchVO = new SearchVO(); searchVO.setSystemId(pushVO.getSystemId()); searchVO.setAppId(pushVO.getAppId()); searchVO.setGroupId(pushVO.getGroupId()); List<CfgRecord> records = mapper.findRecords(searchVO); StringBuilder cfgContent = new StringBuilder(); records.forEach(record -> cfgContent.append(record.getCfgContent()).append(System.lineSeparator())); if (!ZKHelper.setData(path, cfgContent.toString().getBytes())) { baseResponse.setRestStatus(RestStatus.FAIL_50001); } } catch (Exception e) { LOGGER.error("配置推送异常...", e); baseResponse.setRestStatus(RestStatus.FAIL_50001); baseResponse.setErrors(e.getMessage()); } return baseResponse; } @PostMapping(value = "/create", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public BaseResponse createCfg(@RequestBody @Validated PushVO pushVO) { BaseResponse baseResponse = new BaseResponse(); String path = String.format(ZK_PATH_PATTERN , pushVO.getSystemId() , pushVO.getAppId() , pushVO.getGroupId() , pushVO.getCfgId() ); if (ZKHelper.createPath(path)) { baseResponse.setRestStatus(RestStatus.SUCCESS); } else { baseResponse.setRestStatus(RestStatus.FAIL_50001); } return baseResponse; } @PostMapping(value = "/delete", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public BaseResponse deleteCfg(@RequestBody @Validated DeleteVO deleteVO) { BaseResponse baseResponse = new BaseResponse(); String path; if (StringUtils.isBlank(deleteVO.getGroupId())) { path = String.format(ZK_PATH_PATTERN0 , deleteVO.getSystemId() , deleteVO.getAppId() ); } else if (StringUtils.isNotBlank(deleteVO.getGroupId()) && StringUtils.isBlank(deleteVO.getCfgId())) { path = String.format(ZK_PATH_PATTERN1 , deleteVO.getSystemId() , deleteVO.getAppId() , deleteVO.getGroupId() ); } else { path = String.format(ZK_PATH_PATTERN , deleteVO.getSystemId() , deleteVO.getAppId() , deleteVO.getGroupId() , deleteVO.getCfgId() ); } if (ZKHelper.deletePath(path)) { baseResponse.setRestStatus(RestStatus.SUCCESS); } else { baseResponse.setRestStatus(RestStatus.FAIL_50001); } return baseResponse; } @GetMapping(value = "/getdata", produces = MediaType.TEXT_PLAIN_VALUE) public String getData(@RequestParam String path) { return ZKHelper.getData(path); } }
为配置管理前端提供配置保存、配置推送、配置删除等操作。
配置中心测试
@Component @ConfigurationProperties(prefix = "cfg.test") public class TestCfgcenterBean extends BaseCfgcenterBean { @ConfigField private String text; @ConfigField private Map<String, List<String>> map; public String getText() { return text; } public void setText(String text) { this.text = text; } public Map<String, List<String>> getMap() { return map; } public void setMap(Map<String, List<String>> map) { this.map = map; } @Override protected String getDefaultResourcePath() { return StringUtils.EMPTY; } @Override protected void refresh() { super.refresh(); System.out.println("text=" + this.text); System.out.println("map=" + JSON.toJSONString(map)); } }
TestCfgcenterBean继承BaseCfgcenterBean,配置中心配置变更后可以自动将新的配置绑定到对象上。
@SpringBootApplication(exclude = RedissonAutoConfiguration.class) @EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true) @EnableRetry public class SpringbootApplication { public static void main(String[] args) { System.setProperty("xxx.system.id", "test_system"); System.setProperty("xxx.app.id", "test_app"); System.setProperty("groupenv", "x"); SpringApplication.run(SpringbootApplication.class, args); } }
启动类设置配置中心客户端需要的环境变量:系统标识、项目标识、分组标识。
客户端与zk第一建立连接,同步完状态之后,触发INITIALIZED事件,刷新bean属性配置。
客户端与zk断开重连之后,同步完状态后触发INITIALIZED事件,刷新bean属性配置。
需要源码
请关注订阅号,回复:cfgcenter, 便可查看。
上一篇: arm开发板挂载win10 nfsAxe NFS Server
下一篇: Apollo 安装部署
推荐阅读
-
基于ZooKeeper实现简单的配置中心
-
spring cloud 入门系列七:基于Git存储的分布式配置中心--Spring Cloud Config
-
分布式系统中zookeeper实现配置管理+集群管理
-
基于zookeeper实现分布式配置中心(二)
-
分布式配置中心超轻量的实现方式(带源码)
-
SpringCloud微服务 Apollo分布式配置中心客户端获取配置(二)
-
《Spring Cloud 入门》Spring Cloud Config 基于JDBC 的分布式配置中心
-
SpringCloudAlibaba - 基于Nacos构建分布式配置中心
-
基于缓存或zookeeper的分布式锁实现
-
基于mysql和ZooKeeper的分布式锁实现