欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Nacos配置服务原理

程序员文章站 2022-05-15 10:45:33
Nacos Client配置机制 spring加载远程配置 在了解NACOS客户端配置之前,我们先看看spring怎么样加载远程配置的。spring 提供了加载远程配置的扩展接口 PropertySourceLocator。下面看个简单的例子: 实现PropertySourceLocator Pro ......

nacos client配置机制

spring加载远程配置

在了解nacos客户端配置之前,我们先看看spring怎么样加载远程配置的。spring 提供了加载远程配置的扩展接口 propertysourcelocator。下面看个简单的例子:

实现propertysourcelocator

public class greizpropertysourcelocator implements propertysourcelocator {
    @override
    public propertysource<?> locate(environment environment) {
        // 自定义配置,来源可以从任何地方
        map<string, object> source = new hashmap<>();
        source.put("username", "greiz");
        source.put("userage", 18);
        return new mappropertysource(greizpropertysource.property_name, source);
    }
}

propertysourcelocator 只有一个接口,我们可以在该接口实现自定义配置的加载,比如从数据库中获取配置,或者文件中获取配置等。

springboot启动配置类

@configuration
public class greizconfigbootstrapconfiguration {
    @bean
    public greizpropertysourcelocator greizpropertysourcelocator() {
        return new greizpropertysourcelocator();
    }
}

在meta-inf/spring.factories添加启动指定加载类

org.springframework.cloud.bootstrap.bootstrapconfiguration=\
com.greiz.demo.config.greizconfigbootstrapconfiguration

使用

@component
public class greiz {
    @value("${username}")
    private string name;
    @value("${userage}")
    private integer age;
      // 省getter/setter
}

跟本地配置一样使用。

spring启动加载远程配置流程

Nacos配置服务原理

在spring启动preparecontext阶段会执行propertysourcelocator所有实现类加载自定义的配置,最终添加到environment中管理。

nacos-client

拉取远程配置

nacos客户端启动时加载远程配置就是用了上面的方式。下面我们根据源码看一下具体过程。nacospropertysourcelocator 实现了 propertysourcelocator,所以spring启动时会调用locate方法。

public propertysource<?> locate(environment env) {
   // 1. 创建一个跟远程打交道的对象nacosconfigservice
   configservice configservice = nacosconfigproperties.configserviceinstance();
   ... 省略代码
   // 2. 操作nacospropertysource对象,下面三个方法最终都会调用该对象build
   nacospropertysourcebuilder = new nacospropertysourcebuilder(configservice, timeout);
   // 3. 
   string name = nacosconfigproperties.getname();
   string dataidprefix = nacosconfigproperties.getprefix();
   if (stringutils.isempty(dataidprefix)) {
      dataidprefix = name;
   }
   if (stringutils.isempty(dataidprefix)) {
      dataidprefix = env.getproperty("spring.application.name");
   }
   // 从远程获取的properties会存放到该类,最终放到environment中
   compositepropertysource composite = new compositepropertysource(nacos_property_source_name);
   // 加载公共模块配置
   loadsharedconfiguration(composite);
   // 加载扩展配置
   loadextconfiguration(composite);
   // 加载独有配置
   loadapplicationconfiguration(composite, dataidprefix, nacosconfigproperties, env);
   return composite;
}

1处 - 创建 configservice 对象,是通过反射创建出 nacosconfigservice 实例。该类是nacos client 跟 nacos server 重要的对接者。后面会围绕该类细讲。

2处 - 创建 nacospropertysourcebuilder 实例,用于构建和缓存 nacospropertysource,刷新时会用到此处缓存。

3处 - 加载配置的顺序,公共配置 -> 扩展配置 -> 私有配置,如果有相同key的后面的覆盖前面的。默认的 data id 生成规则 ${spring.application.name}.properties。

加载三种配置最终都会调用 nacospropertysourcebuilder.build() 方法。

nacospropertysource build(string dataid, string group, string fileextension, boolean isrefreshable) {
   // 加载配置
   properties p = loadnacosdata(dataid, group, fileextension);
   nacospropertysource nacospropertysource = new nacospropertysource(group, dataid, propertiestomap(p), new date(), isrefreshable);
   // 缓存nacospropertysource
   nacospropertysourcerepository.collectnacospropertysources(nacospropertysource);
   return nacospropertysource;
}

加载配置后封装nacospropertysource,并缓存。

主要逻辑在 nacospropertysourcebuilder.loadnacosdata() 中。

private properties loadnacosdata(string dataid, string group, string fileextension) {
    // 获取配置
    string data = configservice.getconfig(dataid, group, timeout);
    ... 省略代码
    // .properties扩展名
    if (fileextension.equalsignorecase("properties")) {
        properties properties = new properties();
        properties.load(new stringreader(data));
        return properties;
    } else if (fileextension.equalsignorecase("yaml") || fileextension.equalsignorecase("yml"))         {// .yaml或者.yml扩展名
      yamlpropertiesfactorybean yamlfactory = new yamlpropertiesfactorybean();
      yamlfactory.setresources(new bytearrayresource(data.getbytes()));
      return yamlfactory.getobject();
     }
   return empty_properties;
}

把远程获取到的数据根据扩展名解析成统一的properties。nacos控制台配置支持properties和yaml两个扩展名。

真正获取远程配置的是 nacosconfigservice.getconfig(), 调用getconfiginner()。

private string getconfiginner(string tenant, string dataid, string group, long timeoutms) throws nacosexception {
    group = null2defaultgroup(group);
    paramutils.checkkeyparam(dataid, group);
    configresponse cr = new configresponse();
    cr.setdataid(dataid);
    cr.settenant(tenant);
    cr.setgroup(group);

    // 1. 优先使用failvoer配置
    string content = localconfiginfoprocessor.getfailover(agent.getname(), dataid, group, tenant);
    if (content != null) {
        cr.setcontent(content);
        configfilterchainmanager.dofilter(null, cr);
        content = cr.getcontent();
        return content;
    }

    try {
        // 2. 服务器获取配置
        content = worker.getserverconfig(dataid, group, tenant, timeoutms);
        cr.setcontent(content);
        configfilterchainmanager.dofilter(null, cr);
        content = cr.getcontent();
        return content;
    } catch (nacosexception ioe) {
        if (nacosexception.no_right == ioe.geterrcode()) {
            throw ioe;
        }
    }

    // 3. 当服务器挂了就拿本地快照
    content = localconfiginfoprocessor.getsnapshot(agent.getname(), dataid, group, tenant);
    cr.setcontent(content);
    configfilterchainmanager.dofilter(null, cr);
    content = cr.getcontent();
    return content;
}

1处 - 优先从failvoer获取配置,该文件是怎么样产生的,我暂时还不是很清楚,后面搞懂补充。

2处 - 从nacos服务中获取配置。

3处 - 如果2失败了就从本地快照文件获取。该文件由首次读取远程配置文件生成,并且之后轮询配置更新时如果有更新也会对应更新该文件。

访问服务接口的脏活当然需要一个客户端工作者clientworker,下面是 nacosconfigservice.getconfig() 中调用 clientworker.getserverconfig()。

public string getserverconfig(string dataid, string group, string tenant, long readtimeout)
    throws nacosexception {
    // 就是这么简单http请求获取的配置
    httpresult result = agent.httpget(constants.config_controller_path, null, params, agent.getencode(), readtimeout);
  ... 省略代码
    // 写本地文件快照
    localconfiginfoprocessor.savesnapshot(agent.getname(), dataid, group, tenant, result.content);
  ...省略代码
        return result.content;
}

看了上面获取远程配置的代码是不是想喊出f**k,怎么这么简单!!!是的,用http请求 http://ip:port/v1/cs/configs 接口,跟nacos控制台页面访问是一样的。

到此nacos client启动读取远程配置并封装到environment结束了。

长轮询获取更新

前一小节是对项目启动时nacos client加载远程配置过程分析,本节将对项目运行中配置改变了nacos client是怎么样悉知的分析。

前面提到 nacosconfigservice 是 nacos client 对接 nacos server 的桥梁,下面看一下该类在配置更新过程怎么样运作的。先看一下 nacosconfigservice 的构造方法。

public nacosconfigservice(properties properties) throws nacosexception {
    ... 省略代码
    // 初始化 namespace
    initnamespace(properties);
    // 查询服务列表变化情况
    agent = new metricshttpagent(new serverhttpagent(properties));
    agent.start();
    // 配置更新解决方案在这里面
    worker = new clientworker(agent, configfilterchainmanager, properties);
}

在构造函数中初始化 encode、namespace、httpagent 和 clientworker。

httpagent 是通过http获取服务地址列表代理类,维护这服务地址列表和客户端本地一致。

clientworker 是维护服务端配置和客户端配置一致的工作者。前面初始化获取远程配置时也是该对象。

clientworker 内部是怎么样维护客户端属性更新呢?看一下 clientworker 构造函数干了啥。

public clientworker(final httpagent agent, final configfilterchainmanager configfilterchainmanager, final properties properties) {
        ...省略代码
    executor = executors.newscheduledthreadpool(1, new threadfactory() {
        ...省略代码
    });
  
    executorservice = executors.newscheduledthreadpool(runtime.getruntime().availableprocessors(), new threadfactory() {
        ...省略代码
    });

    // 每10毫秒检查一遍配置
    executor.schedulewithfixeddelay(new runnable() {
        @override
        public void run() {
            try {
                checkconfiginfo();
            } catch (throwable e) {
                logger.error("[" + agent.getname() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1l, 10l, timeunit.milliseconds);
}

clientworker 构造函数创建了两个线程池。executor 创建了一个定时任务,每10毫秒执行一次 checkconfiginfo(); executorservice 作用是什么我们接着往下看。

public void checkconfiginfo() {
    // 分任务 向上取整为批数
    int listenersize = cachemap.get().size();
    int longingtaskcount = (int) math.ceil(listenersize / paramutil.getpertaskconfigsize());
    if (longingtaskcount > currentlongingtaskcount) {
        for (int i = (int) currentlongingtaskcount; i < longingtaskcount; i++) {
            executorservice.execute(new longpollingrunnable(i));
        }
        currentlongingtaskcount = longingtaskcount;
    }
}

以分段方式把任务拆分交给 executorservice 执行,默认3000个配置在一个任务中。executor 和 executorservice 是不是很像 netty 中的 boos 和 worker? reactor 模式,分工明确。

longpollingrunnable 是 clientworker 一个成员类,实现 runnable 接口。看一下 run() 方法。

public void run() {
    list<cachedata> cachedatas = new arraylist<cachedata>();
    list<string> ininitializingcachelist = new arraylist<string>();
    try {
        // 1. 只处理该任务中的配置并且检查failover配置
        for (cachedata cachedata : cachemap.get().values()) {
            if (cachedata.gettaskid() == taskid) {
                cachedatas.add(cachedata);
                try {
                    checklocalconfig(cachedata);
                    if (cachedata.isuselocalconfiginfo()) {
                        cachedata.checklistenermd5();
                    }
                } catch (exception e) {
                    logger.error("get local config info error", e);
                }
            }
        }
                // 2. 把客户端的md5值跟服务端的md5比较,把不一样的配置以 "example.properties+default_group"方式返回
        list<string> changedgroupkeys = checkupdatedataids(cachedatas, ininitializingcachelist);
       // 3. 把有更新的配置重新从服务端拉取配置内容
        for (string groupkey : changedgroupkeys) {
            string[] key = groupkey.parsekey(groupkey);
            string dataid = key[0];
            string group = key[1];
            string tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                string content = getserverconfig(dataid, group, tenant, 3000l);
                cachedata cache = cachemap.get().get(groupkey.getkeytenant(dataid, group, tenant));
                // 修改客户端本地值并且重新计算该对象的md5值
                cache.setcontent(content);
            } catch (nacosexception ioe) {
                ...省略代码
            }
        }
        for (cachedata cachedata : cachedatas) {
            if (!cachedata.isinitializing() || ininitializingcachelist.contains(groupkey.getkeytenant(cachedata.dataid, cachedata.group, cachedata.tenant))) {
                // 4. 根据md5值检查是否更新,如果更新通知listener
                cachedata.checklistenermd5();
                cachedata.setinitializing(false);
            }
        }
        ininitializingcachelist.clear();
        // 5. 又把this放进线程池中,形成一个长轮询检查客户端和服务端配置一致性
        executorservice.execute(this);
    } catch (throwable e) {
        executorservice.schedule(this, taskpenaltytime, timeunit.milliseconds);
    }
}

1处 - 筛选属于该任务的配置,并检查 failover 配置。

2处 - 把配置以"dataid group md5 tenant\r\n"拼接后当做参数请求服务器 http://ip:port/v1/cs/configs/listener 接口。服务器返回有更新的配置,以 "example.properties+default_group"方式返回

3处 - 根据2处返回的列表遍历请求服务器 http://ip:port/v1/cs/configs 接口,获取最新配置。然后更新cachedata content值并更新md5值。

4处 - 把 cachedata 新的md5值跟之前的做比较,如果不一样就通知监听者更新值。下一节会跟进去详解。

5处 - 把该 runnable 对象重新放入线程池,形成一个长轮询。

本节分析了 nacos client 配置是怎么样保持跟服务器接近实时同步的。通过长轮询+http短连接方式。

刷新值

在开始本节之前,我们先看一下上面多次出现的一个类 cachedata 结构。

public class cachedata {
    private final string name;
    private final configfilterchainmanager configfilterchainmanager;
    public final string dataid;
    public final string group;
    public final string tenant;
    // 监听列表
    private final copyonwritearraylist<managerlistenerwrap> listeners;
    // 内容md5值
    private volatile string md5;
    // 是否使用本地配置
    private volatile boolean isuselocalconfig = false;
    // 本地版本号
    private volatile long localconfiglastmodified;
    private volatile string content;
    // 长轮询中分段任务id
    private int taskid;
    private volatile boolean isinitializing = true;
  
        ...省略代码
}

根据名字可以得知, cachedata 是配置数据缓存中的对象。listeners 属性比较有意思,在 bo 中拥有一个监听列表,当该对象md5改变时会通过遍历 listeners 通知监听者们。

前一节从服务端获取到有更新的配置之后会检查md5,调用 cachedata.checklistenermd5()方法:

void checklistenermd5() {
   for (managerlistenerwrap wrap : listeners) {
        if (!md5.equals(wrap.lastcallmd5)) {
            safenotifylistener(dataid, group, content, md5, wrap);
        }
    }
}
class managerlistenerwrap {
    final listener listener;
    string lastcallmd5 = cachedata.getmd5string(null);
        ... 省略代码
}

managerlistenerwrap 的 lastcallmd5 是旧配置的md5值,如果 cachedata 的md5和 managerlistenerwrap 的lastcallmd5 值不一样,说明配置有更新。需要通知未更新的监听者。

private void safenotifylistener(final string dataid, final string group, final string content, final string md5, final managerlistenerwrap listenerwrap) {
    final listener listener = listenerwrap.listener;
    runnable job = new runnable() {
        @override
        public void run() {
            ... 省略代码
                // 调用监听者的方法
                listener.receiveconfiginfo(contenttmp);
                listenerwrap.lastcallmd5 = md5;
            ... 省略代码
        }
    };
    try {
        if (null != listener.getexecutor()) {
            listener.getexecutor().execute(job);
        } else {
            job.run();
        }
    } catch (throwable t) {
    }
}

调用了监听者的 receiveconfiginfo() 方法,然后修改 managerlistenerwrap 的lastcallmd5 值。

本节到这里分析了从服务端获取更新配置后通知配置监听者。但是监听者是什么时候注册的呢?接下来我们继续分析监听者注册到 cachedata 过程。

nacoscontextrefresher 实现了applicationlistener 。在容器准备后会调用 onapplicationevent() 方法,最终调用 registernacoslistener() 方法。

private void registernacoslistener(final string group, final string dataid) {
   listener listener = listenermap.computeifabsent(dataid, i -> new listener() {
     // 通知监听者调用的就是这个方法啦 
     @override
      public void receiveconfiginfo(string configinfo) {
         refreshcountincrement();
         string md5 = "";
         if (!stringutils.isempty(configinfo)) {
            try {
               messagedigest md = messagedigest.getinstance("md5");
               md5 = new biginteger(1, md.digest(configinfo.getbytes("utf-8"))).tostring(16);
            }
            catch (nosuchalgorithmexception | unsupportedencodingexception e) {
               log.warn("[nacos] unable to get md5 for dataid: " + dataid, e);
            }
         }
         refreshhistory.add(dataid, md5);
         // spring的刷新事件通知,刷新监听者会被执行
         applicationcontext.publishevent(new refreshevent(this, null, "refresh nacos config"));
      }
      @override
      public executor getexecutor() {
         return null;
      }
   });
  // 注册本监听者
  configservice.addlistener(dataid, group, listener);
  ...省略代码
}

通过 nacosconfigservice.addlistener()注册监听者。

nacosconfigservice.addlistener():

public void addlistener(string dataid, string group, listener listener) throws nacosexception {
    worker.addtenantlisteners(dataid, group, arrays.aslist(listener));
}

还是交给了 clientworker

clientworker.addtenantlisteners()

public void addtenantlisteners(string dataid, string group, list<? extends listener> listeners) throws nacosexception {
    group = null2defaultgroup(group);
    string tenant = agent.gettenant();
    cachedata cache = addcachedataifabsent(dataid, group, tenant);
    for (listener listener : listeners) {
        cache.addlistener(listener);
    }
}

clientworker 把监听者交给了 cachedata 完成了注册。

汇总系统运行中更新配置的流程:

  1. 启动时把本地更新 listener 注册到 cachedata。
  2. clientworker 长轮询同步服务端的更新配置。
  3. 2中获取到更新后的配置,重置 cachedata 内容。
  4. cachedata 回调1中注册上来的 listener.receiveconfiginfo()
  5. listener 最终通知spring刷新事件,完成context刷新属性值。

总结

nacos config client 和 nacos config server 采用定时长轮询http请求访问配置更新,这样设计 nacos config server 和 config client 结构简单。server 也没有长连接模式client过多的压力。