欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Eureka获取服务列表源码解析

程序员文章站 2022-04-14 21:45:35
在之前的文章: "EurekaClient自动装配及启动流程解析" 中,我们提到了在类 的构造方法中存在一个刷新线程和从服务端拉取注册信息的操作 这两个就是eureka获取服务列表的两种情况: 1. 全量获取:Eureka启动时拉取全部服务 2. 增量获取:一个定时任务定时获取 全量获取 全量获取使 ......

在之前的文章:eurekaclient自动装配及启动流程解析中,我们提到了在类discoveryclient的构造方法中存在一个刷新线程和从服务端拉取注册信息的操作

这两个就是eureka获取服务列表的两种情况:

  1. 全量获取:eureka启动时拉取全部服务
  2. 增量获取:一个定时任务定时获取
全量获取
if (clientconfig.shouldfetchregistry() && !fetchregistry(false)) {
         fetchregistryfrombackup();
     }

全量获取使用的fetchregistry方法,如果使用此方法没有成功获取到的话则会执行fetchregistryfrombackup方法使用备份方式拉取,备份拉取使用的是backupregistry接口的实现类,只不过eureka默认没有实现。

继续看拉取流程

private boolean fetchregistry(boolean forcefullregistryfetch) {
        stopwatch tracer = fetch_registry_timer.start();

        try {
            applications applications = getapplications();

            if (clientconfig.shoulddisabledelta()//禁用部分获取
                    || (!strings.isnullorempty(clientconfig.getregistryrefreshsinglevipaddress()))
                    || forcefullregistryfetch//全部获取
                    || (applications == null)//本地没有任何实例
                    || (applications.getregisteredapplications().size() == 0)
                    || (applications.getversion() == -1)) //client application does not have latest library supporting delta
            {
                logger.info("disable delta property : {}", clientconfig.shoulddisabledelta());
                logger.info("single vip registry refresh property : {}", clientconfig.getregistryrefreshsinglevipaddress());
                logger.info("force full registry fetch : {}", forcefullregistryfetch);
                logger.info("application is null : {}", (applications == null));
                logger.info("registered applications size is zero : {}",
                        (applications.getregisteredapplications().size() == 0));
                logger.info("application version is -1: {}", (applications.getversion() == -1));
                getandstorefullregistry();
            } else {
                getandupdatedelta(applications);
            }
            applications.setappshashcode(applications.getreconcilehashcode());
            logtotalinstances();
        } catch (throwable e) {
            logger.error(prefix + "{} - was unable to refresh its cache! status = {}", apppathidentifier, e.getmessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
  1. 首先入参forcefullregistryfetch代表的就是全量获取或者增量获取
  2. 获取本地缓存的这些实例
    private final atomicreference<applications> localregionapps = new atomicreference<applications>();

    public applications getapplications() {
        return localregionapps.get();
    }

可以看到所有实例应该缓存在localregionapps对象中

  1. 然后根据一些条件判断是否应该执行全量获取,也就是就算入参指定增量获取,但是不满足这些条件还是会进行全量获取
  2. 接着是打印当前的实例数量
  3. 最后是更新拉取到的实例的状态
全量拉取处理
    private void getandstorefullregistry() throws throwable {
        long currentupdategeneration = fetchregistrygeneration.get();

        logger.info("getting all instance registry info from the eureka server");

        applications apps = null;
        //发起获取
        eurekahttpresponse<applications> httpresponse = clientconfig.getregistryrefreshsinglevipaddress() == null
                ? eurekatransport.queryclient.getapplications(remoteregionsref.get())
                : eurekatransport.queryclient.getvip(clientconfig.getregistryrefreshsinglevipaddress(), remoteregionsref.get());
        if (httpresponse.getstatuscode() == status.ok.getstatuscode()) {
            apps = httpresponse.getentity();
        }
        logger.info("the response status is {}", httpresponse.getstatuscode());

        if (apps == null) {
            logger.error("the application is null for some reason. not storing this information");
        } else if (fetchregistrygeneration.compareandset(currentupdategeneration, currentupdategeneration + 1)) {
        //缓存结果
            localregionapps.set(this.filterandshuffle(apps));
            logger.debug("got full registry with apps hashcode {}", apps.getappshashcode());
        } else {
            logger.warn("not updating applications as another thread is updating it already");
        }
    }

其中调用的逻辑比较简单:

public eurekahttpresponse<applications> getapplications(string... regions) {
   return getapplicationsinternal("apps/", regions);
}

private eurekahttpresponse<applications> getapplicationsinternal(string urlpath, string[] regions) {
   clientresponse response = null;
   string regionsparamvalue = null;
   try {
       webresource webresource = jerseyclient.resource(serviceurl).path(urlpath);
       if (regions != null && regions.length > 0) {
           regionsparamvalue = stringutil.join(regions);
           webresource = webresource.queryparam("regions", regionsparamvalue);
       }
       builder requestbuilder = webresource.getrequestbuilder();
       addextraheaders(requestbuilder);
       response = requestbuilder.accept(mediatype.application_json_type).get(clientresponse.class); // json

       applications applications = null;
       if (response.getstatus() == status.ok.getstatuscode() && response.hasentity()) {
           applications = response.getentity(applications.class);
       }
       return aneurekahttpresponse(response.getstatus(), applications.class)
               .headers(headersof(response))
               .entity(applications)
               .build();
   } finally {
       if (logger.isdebugenabled()) {
           logger.debug("jersey http get {}/{}?{}; statuscode={}",
                   serviceurl, urlpath,
                   regionsparamvalue == null ? "" : "regions=" + regionsparamvalue,
                   response == null ? "n/a" : response.getstatus()
           );
       }
       if (response != null) {
           response.close();
       }
   }
}
全量拉取服务端处理

全量获取的服务端controller在类applicationsresource

@get
    public response getcontainers(@pathparam("version") string version,
                                  @headerparam(header_accept) string acceptheader,@headerparam(header_accept_encoding) string acceptencoding,@headerparam(eurekaaccept.http_x_eureka_accept) string eurekaaccept,@context uriinfo uriinfo,@nullable @queryparam("regions") string regionsstr) {

        boolean isremoteregionrequested = null != regionsstr && !regionsstr.isempty();
        string[] regions = null;
        if (!isremoteregionrequested) {
            eurekamonitors.get_all.increment();
        } else {
            regions = regionsstr.tolowercase().split(",");
            arrays.sort(regions); // so we don't have different caches for same regions queried in different order.
            eurekamonitors.get_all_with_remote_regions.increment();
        }

        // check if the server allows the access to the registry. the server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        if (!registry.shouldallowaccess(isremoteregionrequested)) {
            return response.status(status.forbidden).build();
        }
        currentrequestversion.set(version.toenum(version));
        keytype keytype = key.keytype.json;
        string returnmediatype = mediatype.application_json;
        if (acceptheader == null || !acceptheader.contains(header_json_value)) {
            keytype = key.keytype.xml;
            returnmediatype = mediatype.application_xml;
        }

        key cachekey = new key(key.entitytype.application,
                responsecacheimpl.all_apps,
                keytype, currentrequestversion.get(), eurekaaccept.fromstring(eurekaaccept), regions
        );

        response response;
        if (acceptencoding != null && acceptencoding.contains(header_gzip_value)) {
            response = response.ok(responsecache.getgzip(cachekey))
                    .header(header_content_encoding, header_gzip_value)
                    .header(header_content_type, returnmediatype)
                    .build();
        } else {
            response = response.ok(responsecache.get(cachekey))
                    .build();
        }
        return response;
    }

虽然这个controller很长,但是与返回结果相关的也就这么几行

        key cachekey = new key(key.entitytype.application,
                responsecacheimpl.all_apps,
                keytype, currentrequestversion.get(), eurekaaccept.fromstring(eurekaaccept), regions
        );
                response response;
        if (acceptencoding != null && acceptencoding.contains(header_gzip_value)) {
            response = response.ok(responsecache.getgzip(cachekey))
                    .header(header_content_encoding, header_gzip_value)
                    .header(header_content_type, returnmediatype)
                    .build();
        } else {
            response = response.ok(responsecache.get(cachekey))
                    .build();
        }

这里有两个点,keyresponsecacheimpl

key

这个对象中包含了缓存键

    public key(entitytype entitytype, string entityname, keytype type, version v, eurekaaccept eurekaaccept, @nullable string[] regions) {
        this.regions = regions;
        this.entitytype = entitytype;
        this.entityname = entityname;
        this.requesttype = type;
        this.requestversion = v;
        this.eurekaaccept = eurekaaccept;
        hashkey = this.entitytype + this.entityname + (null != this.regions ? arrays.tostring(this.regions) : "")
                + requesttype.name() + requestversion.name() + this.eurekaaccept.name();
    }

这个hashkey最后的结果就是类似于这样的:applicationall_appsjsonv2full

responsecacheimpl

这个对象是响应缓存的实现
当hashkey创造好之后,responsecache.getgzip(cachekey)就是读取缓存并压缩的方法

    public byte[] getgzip(key key) {
        value payload = getvalue(key, shouldusereadonlyresponsecache);
        if (payload == null) {
            return null;
        }
        return payload.getgzipped();
    }

payload.getgzipped()是压缩的方法就不看了,看getvalue

value getvalue(final key key, boolean usereadonlycache) {
        value payload = null;
        try {
            if (usereadonlycache) {
                final value currentpayload = readonlycachemap.get(key);
                if (currentpayload != null) {
                    payload = currentpayload;
                } else {
                    payload = readwritecachemap.get(key);
                    readonlycachemap.put(key, payload);
                }
            } else {
                payload = readwritecachemap.get(key);
            }
        } catch (throwable t) {
            logger.error("cannot get value for key : {}", key, t);
        }
        return payload;
    }

大致就是先从readonlycachemap只读缓存中获取,如果不存在的话则从readwritecachemap读写缓存中获取

缓存生成

上面服务端处理请求时是直接从缓存中读取的,那么这个缓存又是在什么时候生成的呢?

读写缓存

缓存的生成在responsecacheimpl的构造方法中

this.readwritecachemap =
                cachebuilder.newbuilder().initialcapacity(1000)
                        .expireafterwrite(serverconfig.getresponsecacheautoexpirationinseconds(), timeunit.seconds)
                        .removallistener(new removallistener<key, value>() {
                            @override
                            public void onremoval(removalnotification<key, value> notification) {
                                key removedkey = notification.getkey();
                                if (removedkey.hasregions()) {
                                    key clonewithnoregions = removedkey.clonewithoutregions();
                                    regionspecifickeys.remove(clonewithnoregions, removedkey);
                                }
                            }
                        })
                        .build(new cacheloader<key, value>() {
                            @override
                            public value load(key key) throws exception {
                                if (key.hasregions()) {
                                    key clonewithnoregions = key.clonewithoutregions();
                                    regionspecifickeys.put(clonewithnoregions, key);
                                }
                                value value = generatepayload(key);
                                return value;
                            }
                        });

可以看到读写缓存的容量是1000,而缓存的生成方法在generatepayload方法中

    private value generatepayload(key key) {
        stopwatch tracer = null;
        try {
            string payload;
            switch (key.getentitytype()) {
                case application:
                    boolean isremoteregionrequested = key.hasregions();

                    if (all_apps.equals(key.getname())) {
                        if (isremoteregionrequested) {
                            tracer = serializeallappswithremoteregiontimer.start();
                            payload = getpayload(key, registry.getapplicationsfrommultipleregions(key.getregions()));
                        } else {
                            tracer = serializeallappstimer.start();
                            payload = getpayload(key, registry.getapplications());
                        }
                    } else if (all_apps_delta.equals(key.getname())) {
                        if (isremoteregionrequested) {
                            tracer = serializedeltaappswithremoteregiontimer.start();
                            versiondeltawithregions.incrementandget();
                            versiondeltawithregionslegacy.incrementandget();
                            payload = getpayload(key,
                                    registry.getapplicationdeltasfrommultipleregions(key.getregions()));
                        } else {
                            tracer = serializedeltaappstimer.start();
                            versiondelta.incrementandget();
                            versiondeltalegacy.incrementandget();
                            payload = getpayload(key, registry.getapplicationdeltas());
                        }
                    } else {
                        tracer = serializeoneapptimer.start();
                        payload = getpayload(key, registry.getapplication(key.getname()));
                    }
                    break;
                case vip:
                case svip:
                    tracer = serializeviptimer.start();
                    payload = getpayload(key, getapplicationsforvip(key, registry));
                    break;
                default:
                    logger.error("unidentified entity type: {} found in the cache key.", key.getentitytype());
                    payload = "";
                    break;
            }
            return new value(payload);
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    }

这个方法的重点在这一句上payload = getpayload(key, registry.getapplications());

getapplications

public applications getapplications() {
        boolean disabletransparentfallback = serverconfig.disabletransparentfallbacktootherregion();
        if (disabletransparentfallback) {
            return getapplicationsfromlocalregiononly();
        } else {
            return getapplicationsfromallremoteregions();  // behavior of falling back to remote region can be disabled.
        }
    }
    

这里会进入getapplicationsfromlocalregiononly方法

    public applications getapplicationsfromlocalregiononly() {
        return getapplicationsfrommultipleregions(empty_str_array);
    }
    public applications getapplicationsfrommultipleregions(string[] remoteregions) {

        boolean includeremoteregion = null != remoteregions && remoteregions.length != 0;

        logger.debug("fetching applications registry with remote regions: {}, regions argument {}",
                includeremoteregion, remoteregions);

        if (includeremoteregion) {
            get_all_with_remote_regions_cache_miss.increment();
        } else {
            get_all_cache_miss.increment();
        }
        applications apps = new applications();
        apps.setversion(1l);
        for (entry<string, map<string, lease<instanceinfo>>> entry : registry.entryset()) {
            application app = null;

            if (entry.getvalue() != null) {
                for (entry<string, lease<instanceinfo>> stringleaseentry : entry.getvalue().entryset()) {
                    lease<instanceinfo> lease = stringleaseentry.getvalue();
                    if (app == null) {
                        app = new application(lease.getholder().getappname());
                    }
                    app.addinstance(decorateinstanceinfo(lease));
                }
            }
            if (app != null) {
                apps.addapplication(app);
            }
        }
        if (includeremoteregion) {
            for (string remoteregion : remoteregions) {
                remoteregionregistry remoteregistry = regionnamevsremoteregistry.get(remoteregion);
                if (null != remoteregistry) {
                    applications remoteapps = remoteregistry.getapplications();
                    for (application application : remoteapps.getregisteredapplications()) {
                        if (shouldfetchfromremoteregistry(application.getname(), remoteregion)) {
                            logger.info("application {}  fetched from the remote region {}",
                                    application.getname(), remoteregion);

                            application appinstancetillnow = apps.getregisteredapplications(application.getname());
                            if (appinstancetillnow == null) {
                                appinstancetillnow = new application(application.getname());
                                apps.addapplication(appinstancetillnow);
                            }
                            for (instanceinfo instanceinfo : application.getinstances()) {
                                appinstancetillnow.addinstance(instanceinfo);
                            }
                        } else {
                            logger.debug("application {} not fetched from the remote region {} as there exists a "
                                            + "whitelist and this app is not in the whitelist.",
                                    application.getname(), remoteregion);
                        }
                    }
                } else {
                    logger.warn("no remote registry available for the remote region {}", remoteregion);
                }
            }
        }
        apps.setappshashcode(apps.getreconcilehashcode());
        return apps;
    }

这里获取的时候分为3个部分:

  1. 第一个for循环中,根据当前服务端的租约信息获取所有的实例信息,每个实例信息使用application对象封装,多个application使用applications对象封装
  2. 第二个for循环则是处理如果请求中要获取某个分区的情况
  3. 设置所有实例的hashcode,这个hashcode是用来在增量获取的时候区分返回结果的

getpayload
这里则仅仅只是一个编码

private string getpayload(key key, applications apps) {
   // 获得编码器
   encoderwrapper encoderwrapper = servercodecs.getencoder(key.gettype(), key.geteurekaaccept());
   string result;
   try {
       // 编码
       result = encoderwrapper.encode(apps);
   } catch (exception e) {
       logger.error("failed to encode the payload for all apps", e);
       return "";
   }
   if(logger.isdebugenabled()) {
       logger.debug("new application cache entry {} with apps hashcode {}", key.tostringcompact(), apps.getappshashcode());
   }
   return result;
}
只读缓存

只读缓存是定时刷新的,同样也在responsecacheimpl的构造方法中

        if (shouldusereadonlyresponsecache) {
            timer.schedule(getcacheupdatetask(),
                    new date(((system.currenttimemillis() / responsecacheupdateintervalms) * responsecacheupdateintervalms)
                            + responsecacheupdateintervalms),
                    responsecacheupdateintervalms);
        }

这个刷新任务是这样的

    private timertask getcacheupdatetask() {
        return new timertask() {
            @override
            public void run() {
                logger.debug("updating the client cache from response cache");
                for (key key : readonlycachemap.keyset()) {
                    if (logger.isdebugenabled()) {
                        logger.debug("updating the client cache from response cache for key : {} {} {} {}",
                                key.getentitytype(), key.getname(), key.getversion(), key.gettype());
                    }
                    try {
                        currentrequestversion.set(key.getversion());
                        value cachevalue = readwritecachemap.get(key);
                        value currentcachevalue = readonlycachemap.get(key);
                        if (cachevalue != currentcachevalue) {
                            readonlycachemap.put(key, cachevalue);
                        }
                    } catch (throwable th) {
                        logger.error("error while updating the client cache from response cache for key {}", key.tostringcompact(), th);
                    }
                }
            }
        };
    }

观察for循环里面的内容,发现只读缓存的内容都是基于读写缓存来的

增量拉取

增量拉取的线程调度和发送心跳的线程调度是在一个方法initscheduledtasks中执行的,代码如下:

      int registryfetchintervalseconds = clientconfig.getregistryfetchintervalseconds();
            int expbackoffbound = clientconfig.getcacherefreshexecutorexponentialbackoffbound();
            scheduler.schedule(
                    new timedsupervisortask(
                            "cacherefresh",
                            scheduler,
                            cacherefreshexecutor,
                            registryfetchintervalseconds,
                            timeunit.seconds,
                            expbackoffbound,
                            new cacherefreshthread()
                    ),
                    registryfetchintervalseconds, timeunit.seconds);

看一下线程cacherefreshthread

 class cacherefreshthread implements runnable {
        public void run() {
            refreshregistry();
        }
    }
   void refreshregistry() {
        try {
            boolean isfetchingremoteregionregistries = isfetchingremoteregionregistries();

            boolean remoteregionsmodified = false;
           //省略了一部分无关代码
            //核心
            boolean success = fetchregistry(remoteregionsmodified);
            if (success) {
                registrysize = localregionapps.get().size();
                lastsuccessfulregistryfetchtimestamp = system.currenttimemillis();
            }

            if (logger.isdebugenabled()) {
                stringbuilder allappshashcodes = new stringbuilder();
                allappshashcodes.append("local region apps hashcode: ");
                allappshashcodes.append(localregionapps.get().getappshashcode());
                allappshashcodes.append(", is fetching remote regions? ");
                allappshashcodes.append(isfetchingremoteregionregistries);
                for (map.entry<string, applications> entry : remoteregionvsapps.entryset()) {
                    allappshashcodes.append(", remote region: ");
                    allappshashcodes.append(entry.getkey());
                    allappshashcodes.append(" , apps hashcode: ");
                    allappshashcodes.append(entry.getvalue().getappshashcode());
                }
                logger.debug("completed cache refresh task for discovery. all apps hash code is {} ",
                        allappshashcodes);
            }
        } catch (throwable e) {
            logger.error("cannot fetch registry from server", e);
        }        
    }

核心在fetchregistry方法,这个在上面已经说过了,只不过部分拉取获取调用的接口是getandupdatedelta而已

private void getandupdatedelta(applications applications) throws throwable {
        long currentupdategeneration = fetchregistrygeneration.get();

        applications delta = null;
        eurekahttpresponse<applications> httpresponse = eurekatransport.queryclient.getdelta(remoteregionsref.get());
        if (httpresponse.getstatuscode() == status.ok.getstatuscode()) {
            delta = httpresponse.getentity();
        }

        if (delta == null) {
            logger.warn("the server does not allow the delta revision to be applied because it is not safe. "
                    + "hence got the full registry.");
            getandstorefullregistry();
        } else if (fetchregistrygeneration.compareandset(currentupdategeneration, currentupdategeneration + 1)) {
            logger.debug("got delta update with apps hashcode {}", delta.getappshashcode());
            string reconcilehashcode = "";
            if (fetchregistryupdatelock.trylock()) {
                try {
                    updatedelta(delta);
                    reconcilehashcode = getreconcilehashcode(applications);
                } finally {
                    fetchregistryupdatelock.unlock();
                }
            } else {
                logger.warn("cannot acquire update lock, aborting getandupdatedelta");
            }
            // there is a diff in number of instances for some reason
            if (!reconcilehashcode.equals(delta.getappshashcode()) || clientconfig.shouldlogdeltadiff()) {
                reconcileandlogdifference(delta, reconcilehashcode);  // this makes a remotecall
            }
        } else {
            logger.warn("not updating application delta as another thread is updating it already");
            logger.debug("ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getappshashcode());
        }
    }

先看服务端的处理,然后再看如何处理结果吧

服务端处理增量拉取
@path("delta")
    @get
    public response getcontainerdifferential(
            @pathparam("version") string version,
            @headerparam(header_accept) string acceptheader,
            @headerparam(header_accept_encoding) string acceptencoding,
            @headerparam(eurekaaccept.http_x_eureka_accept) string eurekaaccept,
            @context uriinfo uriinfo, @nullable @queryparam("regions") string regionsstr) {

        boolean isremoteregionrequested = null != regionsstr && !regionsstr.isempty();

        // if the delta flag is disabled in discovery or if the lease expiration
        // has been disabled, redirect clients to get all instances
        if ((serverconfig.shoulddisabledelta()) || (!registry.shouldallowaccess(isremoteregionrequested))) {
            return response.status(status.forbidden).build();
        }

        string[] regions = null;
        if (!isremoteregionrequested) {
            eurekamonitors.get_all_delta.increment();
        } else {
            regions = regionsstr.tolowercase().split(",");
            arrays.sort(regions); // so we don't have different caches for same regions queried in different order.
            eurekamonitors.get_all_delta_with_remote_regions.increment();
        }

        currentrequestversion.set(version.toenum(version));
        keytype keytype = key.keytype.json;
        string returnmediatype = mediatype.application_json;
        if (acceptheader == null || !acceptheader.contains(header_json_value)) {
            keytype = key.keytype.xml;
            returnmediatype = mediatype.application_xml;
        }

        key cachekey = new key(key.entitytype.application,
                responsecacheimpl.all_apps_delta,
                keytype, currentrequestversion.get(), eurekaaccept.fromstring(eurekaaccept), regions
        );

        if (acceptencoding != null
                && acceptencoding.contains(header_gzip_value)) {
            return response.ok(responsecache.getgzip(cachekey))
                    .header(header_content_encoding, header_gzip_value)
                    .header(header_content_type, returnmediatype)
                    .build();
        } else {
            return response.ok(responsecache.get(cachekey))
                    .build();
        }
    }

这里的处理逻辑跟全量获取大部分逻辑都是一样的,只有一些几点不同:

  1. hashkey是applicationall_apps_deltajsonv2full
  2. 获取实例列表的时候走的是下面的分支
if (all_apps.equals(key.getname())) {
                        if (isremoteregionrequested) {
                            tracer = serializeallappswithremoteregiontimer.start();
                            payload = getpayload(key, registry.getapplicationsfrommultipleregions(key.getregions()));
                        } else {
                            tracer = serializeallappstimer.start();
                            payload = getpayload(key, registry.getapplications());
                        }
                    } else if (all_apps_delta.equals(key.getname())) {
                        if (isremoteregionrequested) {
                            tracer = serializedeltaappswithremoteregiontimer.start();
                            versiondeltawithregions.incrementandget();
                            versiondeltawithregionslegacy.incrementandget();
                            payload = getpayload(key,
                                    registry.getapplicationdeltasfrommultipleregions(key.getregions()));
                        } else {
                            tracer = serializedeltaappstimer.start();
                            versiondelta.incrementandget();
                            versiondeltalegacy.incrementandget();
                            payload = getpayload(key, registry.getapplicationdeltas());
                        }
                    }

看看getapplicationdeltas方法吧

public applications getapplicationdeltas() {
        get_all_cache_miss_delta.increment();
        applications apps = new applications();
        apps.setversion(responsecache.getversiondelta().get());
        map<string, application> applicationinstancesmap = new hashmap<string, application>();
        try {
            write.lock();
            iterator<recentlychangeditem> iter = this.recentlychangedqueue.iterator();
            logger.debug("the number of elements in the delta queue is : {}",
                    this.recentlychangedqueue.size());
            while (iter.hasnext()) {
                lease<instanceinfo> lease = iter.next().getleaseinfo();
                instanceinfo instanceinfo = lease.getholder();
                logger.debug(
                        "the instance id {} is found with status {} and actiontype {}",
                        instanceinfo.getid(), instanceinfo.getstatus().name(), instanceinfo.getactiontype().name());
                application app = applicationinstancesmap.get(instanceinfo
                        .getappname());
                if (app == null) {
                    app = new application(instanceinfo.getappname());
                    applicationinstancesmap.put(instanceinfo.getappname(), app);
                    apps.addapplication(app);
                }
                app.addinstance(decorateinstanceinfo(lease));
            }

            boolean disabletransparentfallback = serverconfig.disabletransparentfallbacktootherregion();

            if (!disabletransparentfallback) {
                applications allappsinlocalregion = getapplications(false);

                for (remoteregionregistry remoteregistry : this.regionnamevsremoteregistry.values()) {
                    applications applications = remoteregistry.getapplicationdeltas();
                    for (application application : applications.getregisteredapplications()) {
                        application appinlocalregistry =
                                allappsinlocalregion.getregisteredapplications(application.getname());
                        if (appinlocalregistry == null) {
                            apps.addapplication(application);
                        }
                    }
                }
            }

            applications allapps = getapplications(!disabletransparentfallback);
            apps.setappshashcode(allapps.getreconcilehashcode());
            return apps;
        } finally {
            write.unlock();
        }
    }

与全量获取不同的是这个最终的结果是从最近租约变更记录队列recentlychangedqueue里来的,其他的流程则差不多

处理增量拉取结果

结果的处理代码

if (delta == null) {
            logger.warn("the server does not allow the delta revision to be applied because it is not safe. "
                    + "hence got the full registry.");
            getandstorefullregistry();
        } else if (fetchregistrygeneration.compareandset(currentupdategeneration, currentupdategeneration + 1)) {
            logger.debug("got delta update with apps hashcode {}", delta.getappshashcode());
            string reconcilehashcode = "";
            if (fetchregistryupdatelock.trylock()) {
                try {
                    updatedelta(delta);
                    reconcilehashcode = getreconcilehashcode(applications);
                } finally {
                    fetchregistryupdatelock.unlock();
                }
            } else {
                logger.warn("cannot acquire update lock, aborting getandupdatedelta");
            }
            // there is a diff in number of instances for some reason
            if (!reconcilehashcode.equals(delta.getappshashcode()) || clientconfig.shouldlogdeltadiff()) {
                reconcileandlogdifference(delta, reconcilehashcode);  // this makes a remotecall
            }
        }
updatedelta
    private void updatedelta(applications delta) {
        int deltacount = 0;
        for (application app : delta.getregisteredapplications()) {
            for (instanceinfo instance : app.getinstances()) {
                applications applications = getapplications();
                string instanceregion = instanceregionchecker.getinstanceregion(instance);
                if (!instanceregionchecker.islocalregion(instanceregion)) {
                    applications remoteapps = remoteregionvsapps.get(instanceregion);
                    if (null == remoteapps) {
                        remoteapps = new applications();
                        remoteregionvsapps.put(instanceregion, remoteapps);
                    }
                    applications = remoteapps;
                }

                ++deltacount;
                if (actiontype.added.equals(instance.getactiontype())) {
                    application existingapp = applications.getregisteredapplications(instance.getappname());
                    if (existingapp == null) {
                        applications.addapplication(app);
                    }
                    logger.debug("added instance {} to the existing apps in region {}", instance.getid(), instanceregion);
                    applications.getregisteredapplications(instance.getappname()).addinstance(instance);
                } else if (actiontype.modified.equals(instance.getactiontype())) {
                    application existingapp = applications.getregisteredapplications(instance.getappname());
                    if (existingapp == null) {
                        applications.addapplication(app);
                    }
                    logger.debug("modified instance {} to the existing apps ", instance.getid());

                    applications.getregisteredapplications(instance.getappname()).addinstance(instance);

                } else if (actiontype.deleted.equals(instance.getactiontype())) {
                    application existingapp = applications.getregisteredapplications(instance.getappname());
                    if (existingapp == null) {
                        applications.addapplication(app);
                    }
                    logger.debug("deleted instance {} to the existing apps ", instance.getid());
                    applications.getregisteredapplications(instance.getappname()).removeinstance(instance);
                }
            }
        }
        logger.debug("the total number of instances fetched by the delta processor : {}", deltacount);

        getapplications().setversion(delta.getversion());
        getapplications().shuffleinstances(clientconfig.shouldfilteronlyupinstances());

        for (applications applications : remoteregionvsapps.values()) {
            applications.setversion(delta.getversion());
            applications.shuffleinstances(clientconfig.shouldfilteronlyupinstances());
        }
    }

大致处理流程为:

  1. 获取本地缓存实例
  2. 如果不存在远程拉取到的实例的分区则在remoteregionvsapps对象中新建分区的key
  3. 根据远程实例的状态(添加、修改、删除)分别进行本地实例状态的更新
  4. 实例的过滤