Spring Cloud Ribbon负载均衡器处理方法
接下来撸一撸负载均衡器的内部,看看是如何获取服务实例,获取以后做了哪些处理,处理后又是如何选取服务实例的。
分成三个部分来撸:
- 配置
- 获取服务
- 选择服务
配置
在上一篇《撸一撸spring cloud ribbon的原理》的配置部分可以看到默认的负载均衡器是zoneawareloadbalancer。
看一看配置类。
位置:
spring-cloud-netflix-core-1.3.5.release.jar org.springframework.cloud.netflix.ribbon ribbonclientconfiguration.class
@suppresswarnings("deprecation") @configuration @enableconfigurationproperties //order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @import({okhttpribbonconfiguration.class, restclientribbonconfiguration.class, httpclientribbonconfiguration.class}) public class ribbonclientconfiguration { // 略 @bean @conditionalonmissingbean public iloadbalancer ribbonloadbalancer(iclientconfig config, serverlist<server> serverlist, serverlistfilter<server> serverlistfilter, irule rule, iping ping, serverlistupdater serverlistupdater) { if (this.propertiesfactory.isset(iloadbalancer.class, name)) { return this.propertiesfactory.get(iloadbalancer.class, config, name); } return new zoneawareloadbalancer<>(config, rule, ping, serverlist, serverlistfilter, serverlistupdater); } // 略 }
在实例化zoneawareloadbalancer的时候注入了,config、rule、ping、serverlist、serverlistfilter、serverlistupdater实例。
config:配置实例。
rule:负载均衡策略实例。
ping:ping实例。
serverlist:获取和更新服务的实例。
serverlistfilter:服务过滤实例。
serverlistupdater:服务列表信息更新实例。
@suppresswarnings("deprecation") @configuration @enableconfigurationproperties //order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @import({okhttpribbonconfiguration.class, restclientribbonconfiguration.class, httpclientribbonconfiguration.class}) public class ribbonclientconfiguration { // 略 @bean @conditionalonmissingbean public iclientconfig ribbonclientconfig() { defaultclientconfigimpl config = new defaultclientconfigimpl(); config.loadproperties(this.name); return config; } @bean @conditionalonmissingbean public irule ribbonrule(iclientconfig config) { if (this.propertiesfactory.isset(irule.class, name)) { return this.propertiesfactory.get(irule.class, config, name); } zoneavoidancerule rule = new zoneavoidancerule(); rule.initwithniwsconfig(config); return rule; } @bean @conditionalonmissingbean public iping ribbonping(iclientconfig config) { if (this.propertiesfactory.isset(iping.class, name)) { return this.propertiesfactory.get(iping.class, config, name); } return new dummyping(); } @bean @conditionalonmissingbean @suppresswarnings("unchecked") public serverlist<server> ribbonserverlist(iclientconfig config) { if (this.propertiesfactory.isset(serverlist.class, name)) { return this.propertiesfactory.get(serverlist.class, config, name); } configurationbasedserverlist serverlist = new configurationbasedserverlist(); serverlist.initwithniwsconfig(config); return serverlist; } @bean @conditionalonmissingbean public serverlistupdater ribbonserverlistupdater(iclientconfig config) { return new pollingserverlistupdater(config); } @bean @conditionalonmissingbean public iloadbalancer ribbonloadbalancer(iclientconfig config, serverlist<server> serverlist, serverlistfilter<server> serverlistfilter, irule rule, iping ping, serverlistupdater serverlistupdater) { if (this.propertiesfactory.isset(iloadbalancer.class, name)) { return this.propertiesfactory.get(iloadbalancer.class, config, name); } return new zoneawareloadbalancer<>(config, rule, ping, serverlist, serverlistfilter, serverlistupdater); } @bean @conditionalonmissingbean @suppresswarnings("unchecked") public serverlistfilter<server> ribbonserverlistfilter(iclientconfig config) { if (this.propertiesfactory.isset(serverlistfilter.class, name)) { return this.propertiesfactory.get(serverlistfilter.class, config, name); } zonepreferenceserverlistfilter filter = new zonepreferenceserverlistfilter(); filter.initwithniwsconfig(config); return filter; } @bean @conditionalonmissingbean public ribbonloadbalancercontext ribbonloadbalancercontext( iloadbalancer loadbalancer, iclientconfig config, retryhandler retryhandler) { return new ribbonloadbalancercontext(loadbalancer, config, retryhandler); } // 略 }
在这里配置相关的实例
config:defaultclientconfigimpl。
rule:zoneavoidancerule。
ping:dummyping。
serverlist:configurationbasedserverlist,基于配置的服务列表实例。
serverlistfilter:zonepreferenceserverlistfilter。
serverlistupdater:pollingserverlistupdater。
要注意的是,在这里serverlist的实例是configurationbasedserverlist,这是在未使用eureka时获取服务信息的实例,是从配置文件中获取。
那么在和eureka配合使用时,需要从 eureka server获取服务信息,那该是哪个实例来做这件事情呢。
在启用eureka服务发现时,会首先会采用eurekaribbonclientconfiguration配置类。
位置:
spring-cloud-netflix-eureka-client-1.3.5.release.jar org.springframework.cloud.netflix.ribbon.eureka eurekaribbonclientconfiguration.class
@configuration @commonslog public class eurekaribbonclientconfiguration { // 略 @bean @conditionalonmissingbean public iping ribbonping(iclientconfig config) { if (this.propertiesfactory.isset(iping.class, serviceid)) { return this.propertiesfactory.get(iping.class, config, serviceid); } niwsdiscoveryping ping = new niwsdiscoveryping(); ping.initwithniwsconfig(config); return ping; } @bean @conditionalonmissingbean public serverlist<?> ribbonserverlist(iclientconfig config, provider<eurekaclient> eurekaclientprovider) { if (this.propertiesfactory.isset(serverlist.class, serviceid)) { return this.propertiesfactory.get(serverlist.class, config, serviceid); } discoveryenabledniwsserverlist discoveryserverlist = new discoveryenabledniwsserverlist( config, eurekaclientprovider); domainextractingserverlist serverlist = new domainextractingserverlist( discoveryserverlist, config, this.approximatezonefromhostname); return serverlist; } // 略 }
在首先采用了eurekaribbonclientconfiguration配置后,实际上各实例变成了
config:defaultclientconfigimpl。
rule:zoneavoidancerule。
ping:niwsdiscoveryping。
serverlist:domainextractingserverlist,内部是discoveryenabledniwsserverlist,实际上是通过服务发现获取服务信息列表。
serverlistfilter:zonepreferenceserverlistfilter。
serverlistupdater:pollingserverlistupdater。
获取服务
在找到获取服务信息入口前,先把负载均衡器的类继承关系撸一下。
在zoneawareloadbalancer的构造中调用了父类dynamicserverlistloadbalancer构造。
位置:
ribbon-loadbalancer-2.2.2.jar
com.netflix.loadbalancer
zoneawareloadbalancer.class
在dynamicserverlistloadbalancer的构造中,调用了restofinit函数。
ribbon-loadbalancer-2.2.2.jar
com.netflix.loadbalancer
dynamicserverlistloadbalancer.class
void restofinit(iclientconfig clientconfig) { boolean primeconnection = this.isenableprimingconnections(); // turn this off to avoid duplicated asynchronous priming done in baseloadbalancer.setserverlist() this.setenableprimingconnections(false); enableandinitlearnnewserversfeature(); updatelistofservers(); if (primeconnection && this.getprimeconnections() != null) { this.getprimeconnections() .primeconnections(getreachableservers()); } this.setenableprimingconnections(primeconnection); logger.info("dynamicserverlistloadbalancer for client {} initialized: {}", clientconfig.getclientname(), this.tostring()); }
先是通过调用enableandinitlearnnewserversfeature方法启动定时更新服务列表,然后立即调用updatelistofservers函数马上获取并更新服务列表信息。
先看下enableandinitlearnnewserversfeature方法,实际上是调用了服务列表信息更新实例的start方法启动定时更新功能。
/** * feature that lets us add new instances (from amis) to the list of * existing servers that the lb will use call this method if you want this * feature enabled */ public void enableandinitlearnnewserversfeature() { logger.info("using serverlistupdater {}", serverlistupdater.getclass().getsimplename()); serverlistupdater.start(updateaction); }
这里的服务列表信息更新实例就是配置阶段配置的pollingserverlistupdater实例,看一下这个类的构造和start方法。
public class pollingserverlistupdater implements serverlistupdater { // 略 private static long listofservers_cache_update_delay = 1000; // msecs; private static int listofservers_cache_repeat_interval = 30 * 1000; // msecs; // 略 private final atomicboolean isactive = new atomicboolean(false); private volatile long lastupdated = system.currenttimemillis(); private final long initialdelayms; private final long refreshintervalms; // 略 public pollingserverlistupdater(iclientconfig clientconfig) { this(listofservers_cache_update_delay, getrefreshintervalms(clientconfig)); } public pollingserverlistupdater(final long initialdelayms, final long refreshintervalms) { this.initialdelayms = initialdelayms; this.refreshintervalms = refreshintervalms; } @override public synchronized void start(final updateaction updateaction) { if (isactive.compareandset(false, true)) { final runnable wrapperrunnable = new runnable() { @override public void run() { if (!isactive.get()) { if (scheduledfuture != null) { scheduledfuture.cancel(true); } return; } try { updateaction.doupdate(); lastupdated = system.currenttimemillis(); } catch (exception e) { logger.warn("failed one update cycle", e); } } }; scheduledfuture = getrefreshexecutor().schedulewithfixeddelay( wrapperrunnable, initialdelayms, refreshintervalms, timeunit.milliseconds ); } else { logger.info("already active, no-op"); } } // 略 }
从构造和常量定义看出来,延迟一秒执行,默认每隔30秒执行更新,可以通过配置修改间隔更新的时间。
从start方法看,就是开了一个定时执行的schedule,定时执行 updateaction.doupdate()。
回到start方法调用方dynamicserverlistloadbalancer类中看一下updateaction实例的定义。
protected final serverlistupdater.updateaction updateaction = new serverlistupdater.updateaction() { @override public void doupdate() { updatelistofservers(); } };
实际上就是调用了dynamicserverlistloadbalancer类的updatelistofservers方法,这跟启动完定时更新后立即更新服务信息列表的路径是一致的。
继续看updatelistofservers方法。
public void updatelistofservers() { list<t> servers = new arraylist<t>(); if (serverlistimpl != null) { servers = serverlistimpl.getupdatedlistofservers(); logger.debug("list of servers for {} obtained from discovery client: {}", getidentifier(), servers); if (filter != null) { servers = filter.getfilteredlistofservers(servers); logger.debug("filtered list of servers for {} obtained from discovery client: {}", getidentifier(), servers); } } updateallserverlist(servers); }
1.通过serverlist实例获取服务信息列表。
2.通过serverlistfilter 实例对获取到的服务信息列表进行过滤。
3.将过滤后的服务信息列表保存到loadbalancerstats中作为状态保持。
接下分别看一下。
1.通过serverlist实例获取服务信息列表。
serverlist实例就是配置阶段生成的domainextractingserverlist,获取服务信息都是委托给discoveryenabledniwsserverlist。
public class discoveryenabledniwsserverlist extends abstractserverlist<discoveryenabledserver>{ // 略 @override public list<discoveryenabledserver> getinitiallistofservers(){ return obtainserversviadiscovery(); } @override public list<discoveryenabledserver> getupdatedlistofservers(){ return obtainserversviadiscovery(); } private list<discoveryenabledserver> obtainserversviadiscovery() { list<discoveryenabledserver> serverlist = new arraylist<discoveryenabledserver>(); if (eurekaclientprovider == null || eurekaclientprovider.get() == null) { logger.warn("eurekaclient has not been initialized yet, returning an empty list"); return new arraylist<discoveryenabledserver>(); } eurekaclient eurekaclient = eurekaclientprovider.get(); if (vipaddresses!=null){ for (string vipaddress : vipaddresses.split(",")) { // if targetregion is null, it will be interpreted as the same region of client list<instanceinfo> listofinstanceinfo = eurekaclient.getinstancesbyvipaddress(vipaddress, issecure, targetregion); for (instanceinfo ii : listofinstanceinfo) { if (ii.getstatus().equals(instancestatus.up)) { if(shoulduseoverrideport){ if(logger.isdebugenabled()){ logger.debug("overriding port on client name: " + clientname + " to " + overrideport); } // copy is necessary since the instanceinfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system instanceinfo copy = new instanceinfo(ii); if(issecure){ ii = new instanceinfo.builder(copy).setsecureport(overrideport).build(); }else{ ii = new instanceinfo.builder(copy).setport(overrideport).build(); } } discoveryenabledserver des = new discoveryenabledserver(ii, issecure, shoulduseipaddr); des.setzone(discoveryclient.getzone(ii)); serverlist.add(des); } } if (serverlist.size()>0 && prioritizevipaddressbasedservers){ break; // if the current vipaddress has servers, we dont use subsequent vipaddress based servers } } } return serverlist; } // 略 }
可以看到其实就是通过eureka客户端从eureka服务端获取所有服务实例信息并把上线的包装成discoveryenabledserver实例,带有zone信息,做到服务列表中。
2.通过serverlistfilter 实例对获取到的服务信息列表进行过滤。
serverlistfilte实例就是配置阶段生成的zonepreferenceserverlistfilter,通过调用该实例的getfilteredlistofservers方法进行过滤。
@data @equalsandhashcode(callsuper = false) public class zonepreferenceserverlistfilter extends zoneaffinityserverlistfilter<server> { private string zone; @override public void initwithniwsconfig(iclientconfig niwsclientconfig) { super.initwithniwsconfig(niwsclientconfig); if (configurationmanager.getdeploymentcontext() != null) { this.zone = configurationmanager.getdeploymentcontext().getvalue( contextkey.zone); } } @override public list<server> getfilteredlistofservers(list<server> servers) { list<server> output = super.getfilteredlistofservers(servers); if (this.zone != null && output.size() == servers.size()) { list<server> local = new arraylist<server>(); for (server server : output) { if (this.zone.equalsignorecase(server.getzone())) { local.add(server); } } if (!local.isempty()) { return local; } } return output; } }
在getfilteredlistofservers方法里面,一上来是调用父类的同名方法先过滤,其实父类也是把和消费端同区域的服务给过滤出来使用,不仅如此,增加了些智能的判定,保证在故障/负载较高时或者可用实例较少时不进行同区域的过滤。
但是在zonepreferenceserverlistfilter.getfilteredlistofservers这里,就算父类没做过过滤,这里依然要把同zone的服务给滤出来使用,谁叫这里的类是zonepreference的呢。
这是比较怪异的地方,感觉父类的智能判定没什么作用。
还是看看zoneaffinityserverlistfilter.getfilteredlistofservers做的辛苦工作吧。
public class zoneaffinityserverlistfilter<t extends server> extends abstractserverlistfilter<t> implements iclientconfigaware { // 略 private boolean shouldenablezoneaffinity(list<t> filtered) { if (!zoneaffinity && !zoneexclusive) { return false; } if (zoneexclusive) { return true; } loadbalancerstats stats = getloadbalancerstats(); if (stats == null) { return zoneaffinity; } else { logger.debug("determining if zone affinity should be enabled with given server list: {}", filtered); zonesnapshot snapshot = stats.getzonesnapshot(filtered); double loadperserver = snapshot.getloadperserver(); int instancecount = snapshot.getinstancecount(); int circuitbreakertrippedcount = snapshot.getcircuittrippedcount(); if (((double) circuitbreakertrippedcount) / instancecount >= blackoutserverpercentagethreshold.get() || loadperserver >= activereqeustsperserverthreshold.get() || (instancecount - circuitbreakertrippedcount) < availableserversthreshold.get()) { logger.debug("zoneaffinity is overriden. blackoutserverpercentage: {}, activereqeustsperserver: {}, availableservers: {}", new object[] {(double) circuitbreakertrippedcount / instancecount, loadperserver, instancecount - circuitbreakertrippedcount}); return false; } else { return true; } } } @override public list<t> getfilteredlistofservers(list<t> servers) { if (zone != null && (zoneaffinity || zoneexclusive) && servers !=null && servers.size() > 0){ list<t> filteredservers = lists.newarraylist(iterables.filter( servers, this.zoneaffinitypredicate.getserveronlypredicate())); if (shouldenablezoneaffinity(filteredservers)) { return filteredservers; } else if (zoneaffinity) { overridecounter.increment(); } } return servers; } // 略 }
首先会将与消费端相同的zone的服务过滤出来,然后通过shouldenablezoneaffinity(filteredservers)来判定是否可以采纳同zone的服务,还是采用所有的服务。
在shouldenablezoneaffinity方法内,对相同zone的服务做了一次snapshot,获取这些服务的实例数量,平均负载,断路的实例数进行计算判定。
可以看一下initwithniwsconfig方法中关键指标的值。
判定条件:
断路实例百分比>=0.8(断路的实例数/服务的实例数量)
平均负载>=0.6
可用实例数<2(实例数量-断路的实例数)
如果达到判定条件,那么就使用全部的服务,保证可用性。
但,上面也说了,因为zonepreferenceserverlistfilter本身总是会选用和消费端zone一致的服务,所以zoneaffinityserverlistfilter.getfilteredlistofservers中做的智能操作并没什么用。
不过,当然可以通过自定义配置来采用zoneaffinityserverlistfilter实例。
3.将过滤后的服务信息列表保存到loadbalancerstats中作为状态保持。
跟进updateallserverlist(servers);
去,一步步深入,会发现,实际上是保存到loadbalancerstats
中,并且这时候的服务是按照zone分组以hashmap<string, list<server>>
结构保存的,key是zone。
选择服务
实现了iloadbalancer接口的负载均衡器,是通过实现chooseserver方法来进行服务的选择,选择后的服务做为目标请求服务。
看一下zoneawareloadbalancer.chooseserver方法。
@override public server chooseserver(object key) { if (!enabled.get() || getloadbalancerstats().getavailablezones().size() <= 1) { logger.debug("zone aware logic disabled or there is only one zone"); return super.chooseserver(key); } server server = null; try { loadbalancerstats lbstats = getloadbalancerstats(); map<string, zonesnapshot> zonesnapshot = zoneavoidancerule.createsnapshot(lbstats); logger.debug("zone snapshots: {}", zonesnapshot); if (triggeringload == null) { triggeringload = dynamicpropertyfactory.getinstance().getdoubleproperty( "zoneawareniwsdiscoveryloadbalancer." + this.getname() + ".triggeringloadperserverthreshold", 0.2d); } if (triggeringblackoutpercentage == null) { triggeringblackoutpercentage = dynamicpropertyfactory.getinstance().getdoubleproperty( "zoneawareniwsdiscoveryloadbalancer." + this.getname() + ".avoidzonewithblackoutpercetage", 0.99999d); } set<string> availablezones = zoneavoidancerule.getavailablezones(zonesnapshot, triggeringload.get(), triggeringblackoutpercentage.get()); logger.debug("available zones: {}", availablezones); if (availablezones != null && availablezones.size() < zonesnapshot.keyset().size()) { string zone = zoneavoidancerule.randomchoosezone(zonesnapshot, availablezones); logger.debug("zone chosen: {}", zone); if (zone != null) { baseloadbalancer zoneloadbalancer = getloadbalancer(zone); server = zoneloadbalancer.chooseserver(key); } } } catch (exception e) { logger.error("error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("zone avoidance logic is not invoked."); return super.chooseserver(key); } }
注意这里有两种用法:
1.通过配置zoneawareniwsdiscoveryloadbalancer.enabled=false关闭区域感知负载均衡,或者zone的个数<=1个。
2.采用区域感知,或者zone的个数>1。
一个个来看一下
1.通过配置zoneawareniwsdiscoveryloadbalancer.enabled=false关闭区域感知负载均衡,或者zone的个数<=1个。
这种情况下,调用了父类baseloadbalancer.chooseserver方法。
public server chooseserver(object key) { if (counter == null) { counter = createcounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (exception e) { logger.warn("loadbalancer [{}]: error choosing server for key {}", name, key, e); return null; } } }
这里使用的负载均衡策略rule实际上就是构造zoneawareloadbalancer时传进来的,在配置阶段生成的zoneavoidancerule策略实例。
public void setrule(irule rule) { if (rule != null) { this.rule = rule; } else { /* default rule */ this.rule = new roundrobinrule(); } if (this.rule.getloadbalancer() != this) { this.rule.setloadbalancer(this); } }
假设,如果没有配置,默认用的是roundrobinrule策略实例。
2.采用区域感知,或者zone的个数>1。
public server chooseserver(object key) { if (!enabled.get() || getloadbalancerstats().getavailablezones().size() <= 1) { logger.debug("zone aware logic disabled or there is only one zone"); return super.chooseserver(key); } server server = null; try { loadbalancerstats lbstats = getloadbalancerstats(); map<string, zonesnapshot> zonesnapshot = zoneavoidancerule.createsnapshot(lbstats); logger.debug("zone snapshots: {}", zonesnapshot); if (triggeringload == null) { triggeringload = dynamicpropertyfactory.getinstance().getdoubleproperty( "zoneawareniwsdiscoveryloadbalancer." + this.getname() + ".triggeringloadperserverthreshold", 0.2d); } if (triggeringblackoutpercentage == null) { triggeringblackoutpercentage = dynamicpropertyfactory.getinstance().getdoubleproperty( "zoneawareniwsdiscoveryloadbalancer." + this.getname() + ".avoidzonewithblackoutpercetage", 0.99999d); } set<string> availablezones = zoneavoidancerule.getavailablezones(zonesnapshot, triggeringload.get(), triggeringblackoutpercentage.get()); logger.debug("available zones: {}", availablezones); if (availablezones != null && availablezones.size() < zonesnapshot.keyset().size()) { string zone = zoneavoidancerule.randomchoosezone(zonesnapshot, availablezones); logger.debug("zone chosen: {}", zone); if (zone != null) { baseloadbalancer zoneloadbalancer = getloadbalancer(zone); server = zoneloadbalancer.chooseserver(key); } } } catch (exception e) { logger.error("error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("zone avoidance logic is not invoked."); return super.chooseserver(key); } }
在这种情况下默认使用zoneavoidancerule负载均衡策略。
获取zone的snapshot信息。
获取可用的zone,通过观察zoneavoidancerule.getavailablezones定义,不是可用zone的条件是:
- 所属实例数==0。
- 故障率>0.99999或者平均负载<0。
- 如果不是上面两种情况,就选择负载最高的一个去除不作为可用的zone。
可用zone都获取后,随机选一个。
并从该zone中,通过zoneawareloadbalancer的父类baseloadbalancer.chooseserver选取服务,上面整理过,baseloadbalancer里如果没有传入rule,那么默认使用roundrobinrule策略轮寻一个服务。
其实,还是上面获取服务中zonepreferenceserverlistfilter过滤器的问题,实际上过滤出来的只有一个和消费端相同的一个zone的服务,所以第2.部分的从可用zone中选取服务的功能是走不到,要走到就得把过滤器给换掉。
总结:
配置的负载均衡器会启动schedule获取服务信息,在使用了eureka客户端时,会从eureka服务获取所有服务实例信息,通过过滤器过滤出可以使用的服务,过滤器默认只过滤出与消费端相同zone的服务,如果要保证高可用可配置zoneaffinityserverlistfilter过滤器,过滤后的服务列表,通过实现了irule接口的负载均衡策略选取对应的服务,如果是使用zone感知的策略,可以从负载情况良好的zone中选取合适的服务。
推荐阅读
-
Spring Cloud Ribbon实现客户端负载均衡的示例
-
详解Spring Cloud负载均衡重要组件Ribbon中重要类的用法
-
Spring Cloud zuul自定义统一异常处理实现方法
-
Spring Cloud Hystrix异常处理方法详解
-
撸一撸Spring Cloud Ribbon的原理-负载均衡策略
-
spring cloud系列学习(SpringCloud之服务注册之Ribbon负载均衡)
-
nginx负载均衡器处理session共享的几种方法
-
Spring Cloud Ribbon负载均衡器处理方法
-
Spring Cloud 负载均衡器 Ribbon原理及实现
-
Spring Cloud Ribbon实现客户端负载均衡的示例