欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring cloud alibaba源码解析:Nacos配置更新

程序员文章站 2022-07-10 18:51:24
配置更新代码框架:当Nacos server上的配置更新的时候,nacos客户端就会去拉取新的配置。原理是:客户端会用定时器定时去拉取配置,找出发生变化的配置,然后更新到本地缓存中。定时器相关代码如下:executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Thr...

配置更新代码框架:

代码位置:nacos-client-1.2.1.jar包里面ClientWorker这个类。我这里分析的版本是1.2.1。

当Nacos server上的配置更新的时候,nacos客户端就会去拉取新的配置。原理是:客户端会用定时器定时去拉取配置,找出发生变化的配置,然后更新到本地缓存中。

定时器相关代码如下:

executor.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        try {
            checkConfigInfo();
        } catch (Throwable e) {
            LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
        }
    }
}, 1L, 10L, TimeUnit.MILLISECONDS);

定时器采用scheduleWithFixedDelay来实现,定时调用checkConfigInfo函数来主动拉取配置,距离上次调用运行完后10毫秒启动下次调用。

public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

这个地方,根据listener的个数(配置文件个数+1,当前我测试的情况是这样的。具体代码后面再研究一下。)来分任务,每个任务(线程)默认负责3000个listener。
这里用另一个定时器执行了LongPollingRunnable任务。

executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
        t.setDaemon(true);
        return t;
    }
});

现在来看一下LongPollingRunnable的实现,LongPollingRunnable是一个Runnable,run方法如下:

@Override
public void run() {

    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        // check server config
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        LOGGER.info("get changedGroupKeys:" + changedGroupKeys);

        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(ct[0]);
                if (null != ct[1]) {
                    cache.setType(ct[1]);
                }
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(ct[0]), ct[1]);
            } catch (NacosException ioe) {
                String message = String.format(
                    "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();

        executorService.execute(this);

    } catch (Throwable e) {

        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

定时读取Nacos Server的配置,如果读取的配置没有发生变化,则等待30s后,返回,执行下一次循环。
如果有更新立即返回。
checkUpdateDataIds获取更新后的group key,形式是dataId+group+tenant的形式。
checkUpdateDataIds方法里面,

long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
    agent.getEncode(), readTimeoutMs);

post请求路径是:/v1/cs/configs/listener
根据阿里接口文档,知道该接口是监听配置接口。
spring cloud alibaba源码解析:Nacos配置更新
getServerConfig方法根据变化的groupkey列表,获取配置内容,使用Open-api的/v1/cs/configs获取配置接口。然后更新本地缓存。

String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
    cache.setType(ct[1]);
}

checkListenerMd5方法,更新listener的md5和content内容,然后发送refresh事件通知,来更新类中的配置字段。

NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));

执行下一次循环:

executorService.execute(this); 

出错的时候,延时taskPenaltyTime ms再执行下一次循环:

executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);

本文地址:https://blog.csdn.net/songhongjin/article/details/107662632