Eureka获取服务列表源码解析
在之前的文章:eurekaclient自动装配及启动流程解析中,我们提到了在类discoveryclient
的构造方法中存在一个刷新线程和从服务端拉取注册信息的操作
这两个就是eureka获取服务列表的两种情况:
- 全量获取:eureka启动时拉取全部服务
- 增量获取:一个定时任务定时获取
全量获取
if (clientconfig.shouldfetchregistry() && !fetchregistry(false)) { fetchregistryfrombackup(); }
全量获取使用的fetchregistry
方法,如果使用此方法没有成功获取到的话则会执行fetchregistryfrombackup
方法使用备份方式拉取,备份拉取使用的是backupregistry
接口的实现类,只不过eureka默认没有实现。
继续看拉取流程
private boolean fetchregistry(boolean forcefullregistryfetch) { stopwatch tracer = fetch_registry_timer.start(); try { applications applications = getapplications(); if (clientconfig.shoulddisabledelta()//禁用部分获取 || (!strings.isnullorempty(clientconfig.getregistryrefreshsinglevipaddress())) || forcefullregistryfetch//全部获取 || (applications == null)//本地没有任何实例 || (applications.getregisteredapplications().size() == 0) || (applications.getversion() == -1)) //client application does not have latest library supporting delta { logger.info("disable delta property : {}", clientconfig.shoulddisabledelta()); logger.info("single vip registry refresh property : {}", clientconfig.getregistryrefreshsinglevipaddress()); logger.info("force full registry fetch : {}", forcefullregistryfetch); logger.info("application is null : {}", (applications == null)); logger.info("registered applications size is zero : {}", (applications.getregisteredapplications().size() == 0)); logger.info("application version is -1: {}", (applications.getversion() == -1)); getandstorefullregistry(); } else { getandupdatedelta(applications); } applications.setappshashcode(applications.getreconcilehashcode()); logtotalinstances(); } catch (throwable e) { logger.error(prefix + "{} - was unable to refresh its cache! status = {}", apppathidentifier, e.getmessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } }
- 首先入参
forcefullregistryfetch
代表的就是全量获取或者增量获取 - 获取本地缓存的这些实例
private final atomicreference<applications> localregionapps = new atomicreference<applications>(); public applications getapplications() { return localregionapps.get(); }
可以看到所有实例应该缓存在localregionapps
对象中
- 然后根据一些条件判断是否应该执行全量获取,也就是就算入参指定增量获取,但是不满足这些条件还是会进行全量获取
- 接着是打印当前的实例数量
- 最后是更新拉取到的实例的状态
全量拉取处理
private void getandstorefullregistry() throws throwable { long currentupdategeneration = fetchregistrygeneration.get(); logger.info("getting all instance registry info from the eureka server"); applications apps = null; //发起获取 eurekahttpresponse<applications> httpresponse = clientconfig.getregistryrefreshsinglevipaddress() == null ? eurekatransport.queryclient.getapplications(remoteregionsref.get()) : eurekatransport.queryclient.getvip(clientconfig.getregistryrefreshsinglevipaddress(), remoteregionsref.get()); if (httpresponse.getstatuscode() == status.ok.getstatuscode()) { apps = httpresponse.getentity(); } logger.info("the response status is {}", httpresponse.getstatuscode()); if (apps == null) { logger.error("the application is null for some reason. not storing this information"); } else if (fetchregistrygeneration.compareandset(currentupdategeneration, currentupdategeneration + 1)) { //缓存结果 localregionapps.set(this.filterandshuffle(apps)); logger.debug("got full registry with apps hashcode {}", apps.getappshashcode()); } else { logger.warn("not updating applications as another thread is updating it already"); } }
其中调用的逻辑比较简单:
public eurekahttpresponse<applications> getapplications(string... regions) { return getapplicationsinternal("apps/", regions); } private eurekahttpresponse<applications> getapplicationsinternal(string urlpath, string[] regions) { clientresponse response = null; string regionsparamvalue = null; try { webresource webresource = jerseyclient.resource(serviceurl).path(urlpath); if (regions != null && regions.length > 0) { regionsparamvalue = stringutil.join(regions); webresource = webresource.queryparam("regions", regionsparamvalue); } builder requestbuilder = webresource.getrequestbuilder(); addextraheaders(requestbuilder); response = requestbuilder.accept(mediatype.application_json_type).get(clientresponse.class); // json applications applications = null; if (response.getstatus() == status.ok.getstatuscode() && response.hasentity()) { applications = response.getentity(applications.class); } return aneurekahttpresponse(response.getstatus(), applications.class) .headers(headersof(response)) .entity(applications) .build(); } finally { if (logger.isdebugenabled()) { logger.debug("jersey http get {}/{}?{}; statuscode={}", serviceurl, urlpath, regionsparamvalue == null ? "" : "regions=" + regionsparamvalue, response == null ? "n/a" : response.getstatus() ); } if (response != null) { response.close(); } } }
全量拉取服务端处理
全量获取的服务端controller在类applicationsresource
中
@get public response getcontainers(@pathparam("version") string version, @headerparam(header_accept) string acceptheader,@headerparam(header_accept_encoding) string acceptencoding,@headerparam(eurekaaccept.http_x_eureka_accept) string eurekaaccept,@context uriinfo uriinfo,@nullable @queryparam("regions") string regionsstr) { boolean isremoteregionrequested = null != regionsstr && !regionsstr.isempty(); string[] regions = null; if (!isremoteregionrequested) { eurekamonitors.get_all.increment(); } else { regions = regionsstr.tolowercase().split(","); arrays.sort(regions); // so we don't have different caches for same regions queried in different order. eurekamonitors.get_all_with_remote_regions.increment(); } // check if the server allows the access to the registry. the server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldallowaccess(isremoteregionrequested)) { return response.status(status.forbidden).build(); } currentrequestversion.set(version.toenum(version)); keytype keytype = key.keytype.json; string returnmediatype = mediatype.application_json; if (acceptheader == null || !acceptheader.contains(header_json_value)) { keytype = key.keytype.xml; returnmediatype = mediatype.application_xml; } key cachekey = new key(key.entitytype.application, responsecacheimpl.all_apps, keytype, currentrequestversion.get(), eurekaaccept.fromstring(eurekaaccept), regions ); response response; if (acceptencoding != null && acceptencoding.contains(header_gzip_value)) { response = response.ok(responsecache.getgzip(cachekey)) .header(header_content_encoding, header_gzip_value) .header(header_content_type, returnmediatype) .build(); } else { response = response.ok(responsecache.get(cachekey)) .build(); } return response; }
虽然这个controller很长,但是与返回结果相关的也就这么几行
key cachekey = new key(key.entitytype.application, responsecacheimpl.all_apps, keytype, currentrequestversion.get(), eurekaaccept.fromstring(eurekaaccept), regions ); response response; if (acceptencoding != null && acceptencoding.contains(header_gzip_value)) { response = response.ok(responsecache.getgzip(cachekey)) .header(header_content_encoding, header_gzip_value) .header(header_content_type, returnmediatype) .build(); } else { response = response.ok(responsecache.get(cachekey)) .build(); }
这里有两个点,key
和responsecacheimpl
key
这个对象中包含了缓存键
public key(entitytype entitytype, string entityname, keytype type, version v, eurekaaccept eurekaaccept, @nullable string[] regions) { this.regions = regions; this.entitytype = entitytype; this.entityname = entityname; this.requesttype = type; this.requestversion = v; this.eurekaaccept = eurekaaccept; hashkey = this.entitytype + this.entityname + (null != this.regions ? arrays.tostring(this.regions) : "") + requesttype.name() + requestversion.name() + this.eurekaaccept.name(); }
这个hashkey最后的结果就是类似于这样的:applicationall_appsjsonv2full
responsecacheimpl
这个对象是响应缓存的实现
当hashkey创造好之后,responsecache.getgzip(cachekey)
就是读取缓存并压缩的方法
public byte[] getgzip(key key) { value payload = getvalue(key, shouldusereadonlyresponsecache); if (payload == null) { return null; } return payload.getgzipped(); }
payload.getgzipped()
是压缩的方法就不看了,看getvalue
value getvalue(final key key, boolean usereadonlycache) { value payload = null; try { if (usereadonlycache) { final value currentpayload = readonlycachemap.get(key); if (currentpayload != null) { payload = currentpayload; } else { payload = readwritecachemap.get(key); readonlycachemap.put(key, payload); } } else { payload = readwritecachemap.get(key); } } catch (throwable t) { logger.error("cannot get value for key : {}", key, t); } return payload; }
大致就是先从readonlycachemap
只读缓存中获取,如果不存在的话则从readwritecachemap
读写缓存中获取
缓存生成
上面服务端处理请求时是直接从缓存中读取的,那么这个缓存又是在什么时候生成的呢?
读写缓存
缓存的生成在responsecacheimpl
的构造方法中
this.readwritecachemap = cachebuilder.newbuilder().initialcapacity(1000) .expireafterwrite(serverconfig.getresponsecacheautoexpirationinseconds(), timeunit.seconds) .removallistener(new removallistener<key, value>() { @override public void onremoval(removalnotification<key, value> notification) { key removedkey = notification.getkey(); if (removedkey.hasregions()) { key clonewithnoregions = removedkey.clonewithoutregions(); regionspecifickeys.remove(clonewithnoregions, removedkey); } } }) .build(new cacheloader<key, value>() { @override public value load(key key) throws exception { if (key.hasregions()) { key clonewithnoregions = key.clonewithoutregions(); regionspecifickeys.put(clonewithnoregions, key); } value value = generatepayload(key); return value; } });
可以看到读写缓存的容量是1000,而缓存的生成方法在generatepayload
方法中
private value generatepayload(key key) { stopwatch tracer = null; try { string payload; switch (key.getentitytype()) { case application: boolean isremoteregionrequested = key.hasregions(); if (all_apps.equals(key.getname())) { if (isremoteregionrequested) { tracer = serializeallappswithremoteregiontimer.start(); payload = getpayload(key, registry.getapplicationsfrommultipleregions(key.getregions())); } else { tracer = serializeallappstimer.start(); payload = getpayload(key, registry.getapplications()); } } else if (all_apps_delta.equals(key.getname())) { if (isremoteregionrequested) { tracer = serializedeltaappswithremoteregiontimer.start(); versiondeltawithregions.incrementandget(); versiondeltawithregionslegacy.incrementandget(); payload = getpayload(key, registry.getapplicationdeltasfrommultipleregions(key.getregions())); } else { tracer = serializedeltaappstimer.start(); versiondelta.incrementandget(); versiondeltalegacy.incrementandget(); payload = getpayload(key, registry.getapplicationdeltas()); } } else { tracer = serializeoneapptimer.start(); payload = getpayload(key, registry.getapplication(key.getname())); } break; case vip: case svip: tracer = serializeviptimer.start(); payload = getpayload(key, getapplicationsforvip(key, registry)); break; default: logger.error("unidentified entity type: {} found in the cache key.", key.getentitytype()); payload = ""; break; } return new value(payload); } finally { if (tracer != null) { tracer.stop(); } } }
这个方法的重点在这一句上payload = getpayload(key, registry.getapplications());
getapplications
public applications getapplications() { boolean disabletransparentfallback = serverconfig.disabletransparentfallbacktootherregion(); if (disabletransparentfallback) { return getapplicationsfromlocalregiononly(); } else { return getapplicationsfromallremoteregions(); // behavior of falling back to remote region can be disabled. } }
这里会进入getapplicationsfromlocalregiononly
方法
public applications getapplicationsfromlocalregiononly() { return getapplicationsfrommultipleregions(empty_str_array); } public applications getapplicationsfrommultipleregions(string[] remoteregions) { boolean includeremoteregion = null != remoteregions && remoteregions.length != 0; logger.debug("fetching applications registry with remote regions: {}, regions argument {}", includeremoteregion, remoteregions); if (includeremoteregion) { get_all_with_remote_regions_cache_miss.increment(); } else { get_all_cache_miss.increment(); } applications apps = new applications(); apps.setversion(1l); for (entry<string, map<string, lease<instanceinfo>>> entry : registry.entryset()) { application app = null; if (entry.getvalue() != null) { for (entry<string, lease<instanceinfo>> stringleaseentry : entry.getvalue().entryset()) { lease<instanceinfo> lease = stringleaseentry.getvalue(); if (app == null) { app = new application(lease.getholder().getappname()); } app.addinstance(decorateinstanceinfo(lease)); } } if (app != null) { apps.addapplication(app); } } if (includeremoteregion) { for (string remoteregion : remoteregions) { remoteregionregistry remoteregistry = regionnamevsremoteregistry.get(remoteregion); if (null != remoteregistry) { applications remoteapps = remoteregistry.getapplications(); for (application application : remoteapps.getregisteredapplications()) { if (shouldfetchfromremoteregistry(application.getname(), remoteregion)) { logger.info("application {} fetched from the remote region {}", application.getname(), remoteregion); application appinstancetillnow = apps.getregisteredapplications(application.getname()); if (appinstancetillnow == null) { appinstancetillnow = new application(application.getname()); apps.addapplication(appinstancetillnow); } for (instanceinfo instanceinfo : application.getinstances()) { appinstancetillnow.addinstance(instanceinfo); } } else { logger.debug("application {} not fetched from the remote region {} as there exists a " + "whitelist and this app is not in the whitelist.", application.getname(), remoteregion); } } } else { logger.warn("no remote registry available for the remote region {}", remoteregion); } } } apps.setappshashcode(apps.getreconcilehashcode()); return apps; }
这里获取的时候分为3个部分:
- 第一个for循环中,根据当前服务端的租约信息获取所有的实例信息,每个实例信息使用
application
对象封装,多个application
使用applications
对象封装 - 第二个for循环则是处理如果请求中要获取某个分区的情况
- 设置所有实例的hashcode,这个hashcode是用来在增量获取的时候区分返回结果的
getpayload
这里则仅仅只是一个编码
private string getpayload(key key, applications apps) { // 获得编码器 encoderwrapper encoderwrapper = servercodecs.getencoder(key.gettype(), key.geteurekaaccept()); string result; try { // 编码 result = encoderwrapper.encode(apps); } catch (exception e) { logger.error("failed to encode the payload for all apps", e); return ""; } if(logger.isdebugenabled()) { logger.debug("new application cache entry {} with apps hashcode {}", key.tostringcompact(), apps.getappshashcode()); } return result; }
只读缓存
只读缓存是定时刷新的,同样也在responsecacheimpl
的构造方法中
if (shouldusereadonlyresponsecache) { timer.schedule(getcacheupdatetask(), new date(((system.currenttimemillis() / responsecacheupdateintervalms) * responsecacheupdateintervalms) + responsecacheupdateintervalms), responsecacheupdateintervalms); }
这个刷新任务是这样的
private timertask getcacheupdatetask() { return new timertask() { @override public void run() { logger.debug("updating the client cache from response cache"); for (key key : readonlycachemap.keyset()) { if (logger.isdebugenabled()) { logger.debug("updating the client cache from response cache for key : {} {} {} {}", key.getentitytype(), key.getname(), key.getversion(), key.gettype()); } try { currentrequestversion.set(key.getversion()); value cachevalue = readwritecachemap.get(key); value currentcachevalue = readonlycachemap.get(key); if (cachevalue != currentcachevalue) { readonlycachemap.put(key, cachevalue); } } catch (throwable th) { logger.error("error while updating the client cache from response cache for key {}", key.tostringcompact(), th); } } } }; }
观察for循环里面的内容,发现只读缓存的内容都是基于读写缓存来的
增量拉取
增量拉取的线程调度和发送心跳的线程调度是在一个方法initscheduledtasks
中执行的,代码如下:
int registryfetchintervalseconds = clientconfig.getregistryfetchintervalseconds(); int expbackoffbound = clientconfig.getcacherefreshexecutorexponentialbackoffbound(); scheduler.schedule( new timedsupervisortask( "cacherefresh", scheduler, cacherefreshexecutor, registryfetchintervalseconds, timeunit.seconds, expbackoffbound, new cacherefreshthread() ), registryfetchintervalseconds, timeunit.seconds);
看一下线程cacherefreshthread
class cacherefreshthread implements runnable { public void run() { refreshregistry(); } } void refreshregistry() { try { boolean isfetchingremoteregionregistries = isfetchingremoteregionregistries(); boolean remoteregionsmodified = false; //省略了一部分无关代码 //核心 boolean success = fetchregistry(remoteregionsmodified); if (success) { registrysize = localregionapps.get().size(); lastsuccessfulregistryfetchtimestamp = system.currenttimemillis(); } if (logger.isdebugenabled()) { stringbuilder allappshashcodes = new stringbuilder(); allappshashcodes.append("local region apps hashcode: "); allappshashcodes.append(localregionapps.get().getappshashcode()); allappshashcodes.append(", is fetching remote regions? "); allappshashcodes.append(isfetchingremoteregionregistries); for (map.entry<string, applications> entry : remoteregionvsapps.entryset()) { allappshashcodes.append(", remote region: "); allappshashcodes.append(entry.getkey()); allappshashcodes.append(" , apps hashcode: "); allappshashcodes.append(entry.getvalue().getappshashcode()); } logger.debug("completed cache refresh task for discovery. all apps hash code is {} ", allappshashcodes); } } catch (throwable e) { logger.error("cannot fetch registry from server", e); } }
核心在fetchregistry
方法,这个在上面已经说过了,只不过部分拉取获取调用的接口是getandupdatedelta
而已
private void getandupdatedelta(applications applications) throws throwable { long currentupdategeneration = fetchregistrygeneration.get(); applications delta = null; eurekahttpresponse<applications> httpresponse = eurekatransport.queryclient.getdelta(remoteregionsref.get()); if (httpresponse.getstatuscode() == status.ok.getstatuscode()) { delta = httpresponse.getentity(); } if (delta == null) { logger.warn("the server does not allow the delta revision to be applied because it is not safe. " + "hence got the full registry."); getandstorefullregistry(); } else if (fetchregistrygeneration.compareandset(currentupdategeneration, currentupdategeneration + 1)) { logger.debug("got delta update with apps hashcode {}", delta.getappshashcode()); string reconcilehashcode = ""; if (fetchregistryupdatelock.trylock()) { try { updatedelta(delta); reconcilehashcode = getreconcilehashcode(applications); } finally { fetchregistryupdatelock.unlock(); } } else { logger.warn("cannot acquire update lock, aborting getandupdatedelta"); } // there is a diff in number of instances for some reason if (!reconcilehashcode.equals(delta.getappshashcode()) || clientconfig.shouldlogdeltadiff()) { reconcileandlogdifference(delta, reconcilehashcode); // this makes a remotecall } } else { logger.warn("not updating application delta as another thread is updating it already"); logger.debug("ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getappshashcode()); } }
先看服务端的处理,然后再看如何处理结果吧
服务端处理增量拉取
@path("delta") @get public response getcontainerdifferential( @pathparam("version") string version, @headerparam(header_accept) string acceptheader, @headerparam(header_accept_encoding) string acceptencoding, @headerparam(eurekaaccept.http_x_eureka_accept) string eurekaaccept, @context uriinfo uriinfo, @nullable @queryparam("regions") string regionsstr) { boolean isremoteregionrequested = null != regionsstr && !regionsstr.isempty(); // if the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverconfig.shoulddisabledelta()) || (!registry.shouldallowaccess(isremoteregionrequested))) { return response.status(status.forbidden).build(); } string[] regions = null; if (!isremoteregionrequested) { eurekamonitors.get_all_delta.increment(); } else { regions = regionsstr.tolowercase().split(","); arrays.sort(regions); // so we don't have different caches for same regions queried in different order. eurekamonitors.get_all_delta_with_remote_regions.increment(); } currentrequestversion.set(version.toenum(version)); keytype keytype = key.keytype.json; string returnmediatype = mediatype.application_json; if (acceptheader == null || !acceptheader.contains(header_json_value)) { keytype = key.keytype.xml; returnmediatype = mediatype.application_xml; } key cachekey = new key(key.entitytype.application, responsecacheimpl.all_apps_delta, keytype, currentrequestversion.get(), eurekaaccept.fromstring(eurekaaccept), regions ); if (acceptencoding != null && acceptencoding.contains(header_gzip_value)) { return response.ok(responsecache.getgzip(cachekey)) .header(header_content_encoding, header_gzip_value) .header(header_content_type, returnmediatype) .build(); } else { return response.ok(responsecache.get(cachekey)) .build(); } }
这里的处理逻辑跟全量获取大部分逻辑都是一样的,只有一些几点不同:
- hashkey是applicationall_apps_deltajsonv2full
- 获取实例列表的时候走的是下面的分支
if (all_apps.equals(key.getname())) { if (isremoteregionrequested) { tracer = serializeallappswithremoteregiontimer.start(); payload = getpayload(key, registry.getapplicationsfrommultipleregions(key.getregions())); } else { tracer = serializeallappstimer.start(); payload = getpayload(key, registry.getapplications()); } } else if (all_apps_delta.equals(key.getname())) { if (isremoteregionrequested) { tracer = serializedeltaappswithremoteregiontimer.start(); versiondeltawithregions.incrementandget(); versiondeltawithregionslegacy.incrementandget(); payload = getpayload(key, registry.getapplicationdeltasfrommultipleregions(key.getregions())); } else { tracer = serializedeltaappstimer.start(); versiondelta.incrementandget(); versiondeltalegacy.incrementandget(); payload = getpayload(key, registry.getapplicationdeltas()); } }
看看getapplicationdeltas
方法吧
public applications getapplicationdeltas() { get_all_cache_miss_delta.increment(); applications apps = new applications(); apps.setversion(responsecache.getversiondelta().get()); map<string, application> applicationinstancesmap = new hashmap<string, application>(); try { write.lock(); iterator<recentlychangeditem> iter = this.recentlychangedqueue.iterator(); logger.debug("the number of elements in the delta queue is : {}", this.recentlychangedqueue.size()); while (iter.hasnext()) { lease<instanceinfo> lease = iter.next().getleaseinfo(); instanceinfo instanceinfo = lease.getholder(); logger.debug( "the instance id {} is found with status {} and actiontype {}", instanceinfo.getid(), instanceinfo.getstatus().name(), instanceinfo.getactiontype().name()); application app = applicationinstancesmap.get(instanceinfo .getappname()); if (app == null) { app = new application(instanceinfo.getappname()); applicationinstancesmap.put(instanceinfo.getappname(), app); apps.addapplication(app); } app.addinstance(decorateinstanceinfo(lease)); } boolean disabletransparentfallback = serverconfig.disabletransparentfallbacktootherregion(); if (!disabletransparentfallback) { applications allappsinlocalregion = getapplications(false); for (remoteregionregistry remoteregistry : this.regionnamevsremoteregistry.values()) { applications applications = remoteregistry.getapplicationdeltas(); for (application application : applications.getregisteredapplications()) { application appinlocalregistry = allappsinlocalregion.getregisteredapplications(application.getname()); if (appinlocalregistry == null) { apps.addapplication(application); } } } } applications allapps = getapplications(!disabletransparentfallback); apps.setappshashcode(allapps.getreconcilehashcode()); return apps; } finally { write.unlock(); } }
与全量获取不同的是这个最终的结果是从最近租约变更记录队列recentlychangedqueue
里来的,其他的流程则差不多
处理增量拉取结果
结果的处理代码
if (delta == null) { logger.warn("the server does not allow the delta revision to be applied because it is not safe. " + "hence got the full registry."); getandstorefullregistry(); } else if (fetchregistrygeneration.compareandset(currentupdategeneration, currentupdategeneration + 1)) { logger.debug("got delta update with apps hashcode {}", delta.getappshashcode()); string reconcilehashcode = ""; if (fetchregistryupdatelock.trylock()) { try { updatedelta(delta); reconcilehashcode = getreconcilehashcode(applications); } finally { fetchregistryupdatelock.unlock(); } } else { logger.warn("cannot acquire update lock, aborting getandupdatedelta"); } // there is a diff in number of instances for some reason if (!reconcilehashcode.equals(delta.getappshashcode()) || clientconfig.shouldlogdeltadiff()) { reconcileandlogdifference(delta, reconcilehashcode); // this makes a remotecall } }
updatedelta
private void updatedelta(applications delta) { int deltacount = 0; for (application app : delta.getregisteredapplications()) { for (instanceinfo instance : app.getinstances()) { applications applications = getapplications(); string instanceregion = instanceregionchecker.getinstanceregion(instance); if (!instanceregionchecker.islocalregion(instanceregion)) { applications remoteapps = remoteregionvsapps.get(instanceregion); if (null == remoteapps) { remoteapps = new applications(); remoteregionvsapps.put(instanceregion, remoteapps); } applications = remoteapps; } ++deltacount; if (actiontype.added.equals(instance.getactiontype())) { application existingapp = applications.getregisteredapplications(instance.getappname()); if (existingapp == null) { applications.addapplication(app); } logger.debug("added instance {} to the existing apps in region {}", instance.getid(), instanceregion); applications.getregisteredapplications(instance.getappname()).addinstance(instance); } else if (actiontype.modified.equals(instance.getactiontype())) { application existingapp = applications.getregisteredapplications(instance.getappname()); if (existingapp == null) { applications.addapplication(app); } logger.debug("modified instance {} to the existing apps ", instance.getid()); applications.getregisteredapplications(instance.getappname()).addinstance(instance); } else if (actiontype.deleted.equals(instance.getactiontype())) { application existingapp = applications.getregisteredapplications(instance.getappname()); if (existingapp == null) { applications.addapplication(app); } logger.debug("deleted instance {} to the existing apps ", instance.getid()); applications.getregisteredapplications(instance.getappname()).removeinstance(instance); } } } logger.debug("the total number of instances fetched by the delta processor : {}", deltacount); getapplications().setversion(delta.getversion()); getapplications().shuffleinstances(clientconfig.shouldfilteronlyupinstances()); for (applications applications : remoteregionvsapps.values()) { applications.setversion(delta.getversion()); applications.shuffleinstances(clientconfig.shouldfilteronlyupinstances()); } }
大致处理流程为:
- 获取本地缓存实例
- 如果不存在远程拉取到的实例的分区则在
remoteregionvsapps
对象中新建分区的key - 根据远程实例的状态(添加、修改、删除)分别进行本地实例状态的更新
- 实例的过滤