Nacos配置服务原理
nacos client配置机制
spring加载远程配置
在了解nacos客户端配置之前,我们先看看spring怎么样加载远程配置的。spring 提供了加载远程配置的扩展接口 propertysourcelocator。下面看个简单的例子:
实现propertysourcelocator
public class greizpropertysourcelocator implements propertysourcelocator { @override public propertysource<?> locate(environment environment) { // 自定义配置,来源可以从任何地方 map<string, object> source = new hashmap<>(); source.put("username", "greiz"); source.put("userage", 18); return new mappropertysource(greizpropertysource.property_name, source); } }
propertysourcelocator 只有一个接口,我们可以在该接口实现自定义配置的加载,比如从数据库中获取配置,或者文件中获取配置等。
springboot启动配置类
@configuration public class greizconfigbootstrapconfiguration { @bean public greizpropertysourcelocator greizpropertysourcelocator() { return new greizpropertysourcelocator(); } }
在meta-inf/spring.factories添加启动指定加载类
org.springframework.cloud.bootstrap.bootstrapconfiguration=\ com.greiz.demo.config.greizconfigbootstrapconfiguration
使用
@component public class greiz { @value("${username}") private string name; @value("${userage}") private integer age; // 省getter/setter }
跟本地配置一样使用。
spring启动加载远程配置流程
在spring启动preparecontext阶段会执行propertysourcelocator所有实现类加载自定义的配置,最终添加到environment中管理。
nacos-client
拉取远程配置
nacos客户端启动时加载远程配置就是用了上面的方式。下面我们根据源码看一下具体过程。nacospropertysourcelocator 实现了 propertysourcelocator,所以spring启动时会调用locate方法。
public propertysource<?> locate(environment env) { // 1. 创建一个跟远程打交道的对象nacosconfigservice configservice configservice = nacosconfigproperties.configserviceinstance(); ... 省略代码 // 2. 操作nacospropertysource对象,下面三个方法最终都会调用该对象build nacospropertysourcebuilder = new nacospropertysourcebuilder(configservice, timeout); // 3. string name = nacosconfigproperties.getname(); string dataidprefix = nacosconfigproperties.getprefix(); if (stringutils.isempty(dataidprefix)) { dataidprefix = name; } if (stringutils.isempty(dataidprefix)) { dataidprefix = env.getproperty("spring.application.name"); } // 从远程获取的properties会存放到该类,最终放到environment中 compositepropertysource composite = new compositepropertysource(nacos_property_source_name); // 加载公共模块配置 loadsharedconfiguration(composite); // 加载扩展配置 loadextconfiguration(composite); // 加载独有配置 loadapplicationconfiguration(composite, dataidprefix, nacosconfigproperties, env); return composite; }
1处 - 创建 configservice 对象,是通过反射创建出 nacosconfigservice 实例。该类是nacos client 跟 nacos server 重要的对接者。后面会围绕该类细讲。
2处 - 创建 nacospropertysourcebuilder 实例,用于构建和缓存 nacospropertysource,刷新时会用到此处缓存。
3处 - 加载配置的顺序,公共配置 -> 扩展配置 -> 私有配置,如果有相同key的后面的覆盖前面的。默认的 data id 生成规则 ${spring.application.name}.properties。
加载三种配置最终都会调用 nacospropertysourcebuilder.build() 方法。
nacospropertysource build(string dataid, string group, string fileextension, boolean isrefreshable) { // 加载配置 properties p = loadnacosdata(dataid, group, fileextension); nacospropertysource nacospropertysource = new nacospropertysource(group, dataid, propertiestomap(p), new date(), isrefreshable); // 缓存nacospropertysource nacospropertysourcerepository.collectnacospropertysources(nacospropertysource); return nacospropertysource; }
加载配置后封装nacospropertysource,并缓存。
主要逻辑在 nacospropertysourcebuilder.loadnacosdata() 中。
private properties loadnacosdata(string dataid, string group, string fileextension) { // 获取配置 string data = configservice.getconfig(dataid, group, timeout); ... 省略代码 // .properties扩展名 if (fileextension.equalsignorecase("properties")) { properties properties = new properties(); properties.load(new stringreader(data)); return properties; } else if (fileextension.equalsignorecase("yaml") || fileextension.equalsignorecase("yml")) {// .yaml或者.yml扩展名 yamlpropertiesfactorybean yamlfactory = new yamlpropertiesfactorybean(); yamlfactory.setresources(new bytearrayresource(data.getbytes())); return yamlfactory.getobject(); } return empty_properties; }
把远程获取到的数据根据扩展名解析成统一的properties。nacos控制台配置支持properties和yaml两个扩展名。
真正获取远程配置的是 nacosconfigservice.getconfig(), 调用getconfiginner()。
private string getconfiginner(string tenant, string dataid, string group, long timeoutms) throws nacosexception { group = null2defaultgroup(group); paramutils.checkkeyparam(dataid, group); configresponse cr = new configresponse(); cr.setdataid(dataid); cr.settenant(tenant); cr.setgroup(group); // 1. 优先使用failvoer配置 string content = localconfiginfoprocessor.getfailover(agent.getname(), dataid, group, tenant); if (content != null) { cr.setcontent(content); configfilterchainmanager.dofilter(null, cr); content = cr.getcontent(); return content; } try { // 2. 服务器获取配置 content = worker.getserverconfig(dataid, group, tenant, timeoutms); cr.setcontent(content); configfilterchainmanager.dofilter(null, cr); content = cr.getcontent(); return content; } catch (nacosexception ioe) { if (nacosexception.no_right == ioe.geterrcode()) { throw ioe; } } // 3. 当服务器挂了就拿本地快照 content = localconfiginfoprocessor.getsnapshot(agent.getname(), dataid, group, tenant); cr.setcontent(content); configfilterchainmanager.dofilter(null, cr); content = cr.getcontent(); return content; }
1处 - 优先从failvoer获取配置,该文件是怎么样产生的,我暂时还不是很清楚,后面搞懂补充。
2处 - 从nacos服务中获取配置。
3处 - 如果2失败了就从本地快照文件获取。该文件由首次读取远程配置文件生成,并且之后轮询配置更新时如果有更新也会对应更新该文件。
访问服务接口的脏活当然需要一个客户端工作者clientworker,下面是 nacosconfigservice.getconfig() 中调用 clientworker.getserverconfig()。
public string getserverconfig(string dataid, string group, string tenant, long readtimeout) throws nacosexception { // 就是这么简单http请求获取的配置 httpresult result = agent.httpget(constants.config_controller_path, null, params, agent.getencode(), readtimeout); ... 省略代码 // 写本地文件快照 localconfiginfoprocessor.savesnapshot(agent.getname(), dataid, group, tenant, result.content); ...省略代码 return result.content; }
看了上面获取远程配置的代码是不是想喊出f**k,怎么这么简单!!!是的,用http请求 http://ip:port/v1/cs/configs 接口,跟nacos控制台页面访问是一样的。
到此nacos client启动读取远程配置并封装到environment结束了。
长轮询获取更新
前一小节是对项目启动时nacos client加载远程配置过程分析,本节将对项目运行中配置改变了nacos client是怎么样悉知的分析。
前面提到 nacosconfigservice 是 nacos client 对接 nacos server 的桥梁,下面看一下该类在配置更新过程怎么样运作的。先看一下 nacosconfigservice 的构造方法。
public nacosconfigservice(properties properties) throws nacosexception { ... 省略代码 // 初始化 namespace initnamespace(properties); // 查询服务列表变化情况 agent = new metricshttpagent(new serverhttpagent(properties)); agent.start(); // 配置更新解决方案在这里面 worker = new clientworker(agent, configfilterchainmanager, properties); }
在构造函数中初始化 encode、namespace、httpagent 和 clientworker。
httpagent 是通过http获取服务地址列表代理类,维护这服务地址列表和客户端本地一致。
clientworker 是维护服务端配置和客户端配置一致的工作者。前面初始化获取远程配置时也是该对象。
clientworker 内部是怎么样维护客户端属性更新呢?看一下 clientworker 构造函数干了啥。
public clientworker(final httpagent agent, final configfilterchainmanager configfilterchainmanager, final properties properties) { ...省略代码 executor = executors.newscheduledthreadpool(1, new threadfactory() { ...省略代码 }); executorservice = executors.newscheduledthreadpool(runtime.getruntime().availableprocessors(), new threadfactory() { ...省略代码 }); // 每10毫秒检查一遍配置 executor.schedulewithfixeddelay(new runnable() { @override public void run() { try { checkconfiginfo(); } catch (throwable e) { logger.error("[" + agent.getname() + "] [sub-check] rotate check error", e); } } }, 1l, 10l, timeunit.milliseconds); }
clientworker 构造函数创建了两个线程池。executor 创建了一个定时任务,每10毫秒执行一次 checkconfiginfo(); executorservice 作用是什么我们接着往下看。
public void checkconfiginfo() { // 分任务 向上取整为批数 int listenersize = cachemap.get().size(); int longingtaskcount = (int) math.ceil(listenersize / paramutil.getpertaskconfigsize()); if (longingtaskcount > currentlongingtaskcount) { for (int i = (int) currentlongingtaskcount; i < longingtaskcount; i++) { executorservice.execute(new longpollingrunnable(i)); } currentlongingtaskcount = longingtaskcount; } }
以分段方式把任务拆分交给 executorservice 执行,默认3000个配置在一个任务中。executor 和 executorservice 是不是很像 netty 中的 boos 和 worker? reactor 模式,分工明确。
longpollingrunnable 是 clientworker 一个成员类,实现 runnable 接口。看一下 run() 方法。
public void run() { list<cachedata> cachedatas = new arraylist<cachedata>(); list<string> ininitializingcachelist = new arraylist<string>(); try { // 1. 只处理该任务中的配置并且检查failover配置 for (cachedata cachedata : cachemap.get().values()) { if (cachedata.gettaskid() == taskid) { cachedatas.add(cachedata); try { checklocalconfig(cachedata); if (cachedata.isuselocalconfiginfo()) { cachedata.checklistenermd5(); } } catch (exception e) { logger.error("get local config info error", e); } } } // 2. 把客户端的md5值跟服务端的md5比较,把不一样的配置以 "example.properties+default_group"方式返回 list<string> changedgroupkeys = checkupdatedataids(cachedatas, ininitializingcachelist); // 3. 把有更新的配置重新从服务端拉取配置内容 for (string groupkey : changedgroupkeys) { string[] key = groupkey.parsekey(groupkey); string dataid = key[0]; string group = key[1]; string tenant = null; if (key.length == 3) { tenant = key[2]; } try { string content = getserverconfig(dataid, group, tenant, 3000l); cachedata cache = cachemap.get().get(groupkey.getkeytenant(dataid, group, tenant)); // 修改客户端本地值并且重新计算该对象的md5值 cache.setcontent(content); } catch (nacosexception ioe) { ...省略代码 } } for (cachedata cachedata : cachedatas) { if (!cachedata.isinitializing() || ininitializingcachelist.contains(groupkey.getkeytenant(cachedata.dataid, cachedata.group, cachedata.tenant))) { // 4. 根据md5值检查是否更新,如果更新通知listener cachedata.checklistenermd5(); cachedata.setinitializing(false); } } ininitializingcachelist.clear(); // 5. 又把this放进线程池中,形成一个长轮询检查客户端和服务端配置一致性 executorservice.execute(this); } catch (throwable e) { executorservice.schedule(this, taskpenaltytime, timeunit.milliseconds); } }
1处 - 筛选属于该任务的配置,并检查 failover 配置。
2处 - 把配置以"dataid group md5 tenant\r\n"拼接后当做参数请求服务器 http://ip:port/v1/cs/configs/listener 接口。服务器返回有更新的配置,以 "example.properties+default_group"方式返回
3处 - 根据2处返回的列表遍历请求服务器 http://ip:port/v1/cs/configs 接口,获取最新配置。然后更新cachedata content值并更新md5值。
4处 - 把 cachedata 新的md5值跟之前的做比较,如果不一样就通知监听者更新值。下一节会跟进去详解。
5处 - 把该 runnable 对象重新放入线程池,形成一个长轮询。
本节分析了 nacos client 配置是怎么样保持跟服务器接近实时同步的。通过长轮询+http短连接方式。
刷新值
在开始本节之前,我们先看一下上面多次出现的一个类 cachedata 结构。
public class cachedata { private final string name; private final configfilterchainmanager configfilterchainmanager; public final string dataid; public final string group; public final string tenant; // 监听列表 private final copyonwritearraylist<managerlistenerwrap> listeners; // 内容md5值 private volatile string md5; // 是否使用本地配置 private volatile boolean isuselocalconfig = false; // 本地版本号 private volatile long localconfiglastmodified; private volatile string content; // 长轮询中分段任务id private int taskid; private volatile boolean isinitializing = true; ...省略代码 }
根据名字可以得知, cachedata 是配置数据缓存中的对象。listeners 属性比较有意思,在 bo 中拥有一个监听列表,当该对象md5改变时会通过遍历 listeners 通知监听者们。
前一节从服务端获取到有更新的配置之后会检查md5,调用 cachedata.checklistenermd5()方法:
void checklistenermd5() { for (managerlistenerwrap wrap : listeners) { if (!md5.equals(wrap.lastcallmd5)) { safenotifylistener(dataid, group, content, md5, wrap); } } }
class managerlistenerwrap { final listener listener; string lastcallmd5 = cachedata.getmd5string(null); ... 省略代码 }
managerlistenerwrap 的 lastcallmd5 是旧配置的md5值,如果 cachedata 的md5和 managerlistenerwrap 的lastcallmd5 值不一样,说明配置有更新。需要通知未更新的监听者。
private void safenotifylistener(final string dataid, final string group, final string content, final string md5, final managerlistenerwrap listenerwrap) { final listener listener = listenerwrap.listener; runnable job = new runnable() { @override public void run() { ... 省略代码 // 调用监听者的方法 listener.receiveconfiginfo(contenttmp); listenerwrap.lastcallmd5 = md5; ... 省略代码 } }; try { if (null != listener.getexecutor()) { listener.getexecutor().execute(job); } else { job.run(); } } catch (throwable t) { } }
调用了监听者的 receiveconfiginfo() 方法,然后修改 managerlistenerwrap 的lastcallmd5 值。
本节到这里分析了从服务端获取更新配置后通知配置监听者。但是监听者是什么时候注册的呢?接下来我们继续分析监听者注册到 cachedata 过程。
nacoscontextrefresher 实现了applicationlistener
通过 nacosconfigservice.addlistener()注册监听者。 nacosconfigservice.addlistener(): 还是交给了 clientworker clientworker.addtenantlisteners() clientworker 把监听者交给了 cachedata 完成了注册。 汇总系统运行中更新配置的流程: nacos config client 和 nacos config server 采用定时长轮询http请求访问配置更新,这样设计 nacos config server 和 config client 结构简单。server 也没有长连接模式client过多的压力。private void registernacoslistener(final string group, final string dataid) {
listener listener = listenermap.computeifabsent(dataid, i -> new listener() {
// 通知监听者调用的就是这个方法啦
@override
public void receiveconfiginfo(string configinfo) {
refreshcountincrement();
string md5 = "";
if (!stringutils.isempty(configinfo)) {
try {
messagedigest md = messagedigest.getinstance("md5");
md5 = new biginteger(1, md.digest(configinfo.getbytes("utf-8"))).tostring(16);
}
catch (nosuchalgorithmexception | unsupportedencodingexception e) {
log.warn("[nacos] unable to get md5 for dataid: " + dataid, e);
}
}
refreshhistory.add(dataid, md5);
// spring的刷新事件通知,刷新监听者会被执行
applicationcontext.publishevent(new refreshevent(this, null, "refresh nacos config"));
}
@override
public executor getexecutor() {
return null;
}
});
// 注册本监听者
configservice.addlistener(dataid, group, listener);
...省略代码
}
public void addlistener(string dataid, string group, listener listener) throws nacosexception {
worker.addtenantlisteners(dataid, group, arrays.aslist(listener));
}
public void addtenantlisteners(string dataid, string group, list<? extends listener> listeners) throws nacosexception {
group = null2defaultgroup(group);
string tenant = agent.gettenant();
cachedata cache = addcachedataifabsent(dataid, group, tenant);
for (listener listener : listeners) {
cache.addlistener(listener);
}
}
总结
上一篇: Java性能 -- CAS乐观锁
下一篇: redis 发布订阅
推荐阅读
-
centos7系统安装配置openvpn服务端
-
CentOS6.5环境安装nginx服务器及负载均衡配置操作详解
-
详解Docker Swarm服务发现和负载均衡原理
-
spring5 源码深度解析----- 被面试官给虐懵了,竟然是因为我不懂@Configuration配置类及@Bean的原理
-
tomcat配置详解web(简述tomcat工作原理)
-
Android socket实现原理详解 服务端和客户端如何搭建
-
linux 学习第十八天学习(DNS分离解析、DHCP配置、邮件服务配置)
-
Linux服务器配置—搭建NFS服务器步骤
-
apache服务器全局配置详解(全)
-
基于Linux网关服务器squid配置过程详解