spring cloud alibaba源码解析:Nacos配置更新
配置更新代码框架:
代码位置:nacos-client-1.2.1.jar包里面ClientWorker这个类。我这里分析的版本是1.2.1。
当Nacos server上的配置更新的时候,nacos客户端就会去拉取新的配置。原理是:客户端会用定时器定时去拉取配置,找出发生变化的配置,然后更新到本地缓存中。
定时器相关代码如下:
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
定时器采用scheduleWithFixedDelay来实现,定时调用checkConfigInfo函数来主动拉取配置,距离上次调用运行完后10毫秒启动下次调用。
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
这个地方,根据listener的个数(配置文件个数+1,当前我测试的情况是这样的。具体代码后面再研究一下。)来分任务,每个任务(线程)默认负责3000个listener。
这里用另一个定时器执行了LongPollingRunnable任务。
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
现在来看一下LongPollingRunnable的实现,LongPollingRunnable是一个Runnable,run方法如下:
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
定时读取Nacos Server的配置,如果读取的配置没有发生变化,则等待30s后,返回,执行下一次循环。
如果有更新立即返回。
checkUpdateDataIds获取更新后的group key,形式是dataId+group+tenant的形式。
checkUpdateDataIds方法里面,
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), readTimeoutMs);
post请求路径是:/v1/cs/configs/listener
根据阿里接口文档,知道该接口是监听配置接口。
getServerConfig方法根据变化的groupkey列表,获取配置内容,使用Open-api的/v1/cs/configs获取配置接口。然后更新本地缓存。
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
checkListenerMd5方法,更新listener的md5和content内容,然后发送refresh事件通知,来更新类中的配置字段。
NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
执行下一次循环:
executorService.execute(this);
出错的时候,延时taskPenaltyTime ms再执行下一次循环:
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
本文地址:https://blog.csdn.net/songhongjin/article/details/107662632
推荐阅读
-
Spring Cloud Alibaba | Nacos配置管理
-
荐 spring cloud alibaba nacos 实现配置管理
-
Spring Cloud Alibaba Sentinel用nacos配置规则
-
Nacos--在Spring cloud中使用Spring Cloud Alibaba Nacos Discovery(服务注册+配置管理示例)
-
spring cloud alibaba之nacos配置中心
-
Spring Cloud Alibaba + Nacos Config实现配置动态更新
-
Spring Cloud Alibaba Nacos配置中心(二)
-
Spring Cloud Alibaba(三)——使用 Nacos config 实现配置管理
-
记录使用spring-cloud-starter-alibaba-nacos-config 注册到 nacos 时配置问题。
-
使用 Spring Cloud Alibaba Nacos Config 作为配置中心