Spring Cloud Ribbon源码分析
如何使用Spring Cloud Ribbon进行负载均衡调用?
前面写的Eureka Client,服务消费者调用服务消费者的时候,是通过负载均衡方式调用的。这里再回顾下当时的用法:
1.在启动类中,声明一个RestTemplate的bean,用@LoadBalanced注解修饰:
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
2.在进行http调用的地方,通过@Autowired方式,注入RestTemplate,再通过restTemplate调用。调用时,写服务名就可以,比如按照下面这种方式调用。
return this.restTemplate.getForEntity("http://MICROSERVICE-PROVIDER/provider", String.class).getBody();
通过这两步操作,进行调用时,就可以通过服务名,得到多个服务实例的信息,通过负载均衡的方式进行调用。
从上面的步骤看来,使用负载均衡的开发量还是挺简单的,不过为什么在restTemplate上加个注解,通过restTemplate进行调用,就可以做到负载均衡呢,这里就需要研究下源码才能知道原因了。
Spring Cloud Ribbon源码分析
既然是在restTemplate加了@LoadBalanced注解,那就进去这个注解里面看下吧。
/**
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
* @author Spencer Gibb
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
从注释中可以知道,这个注解是用来给RestTemplate做标记,以使用负载均衡客户端(LoadBalancerClient)来配置它。所以,我们在生成的RestTemplate的bean上添加这么一个注解,这个bean就会配置LoadBalancerClient。
那么,就再看下LoadBalancerClient的代码:
/**
* Represents a client side load balancer
* @author Spencer Gibb
*/
public interface LoadBalancerClient {
/**
* Choose a ServiceInstance from the LoadBalancer for the specified service
* @param serviceId the service id to look up the LoadBalancer
* @return a ServiceInstance that matches the serviceId
*/
ServiceInstance choose(String serviceId);
/**
* execute request using a ServiceInstance from the LoadBalancer for the specified
* service
* @param serviceId the service id to look up the LoadBalancer
* @param request allows implementations to execute pre and post actions such as
* incrementing metrics
* @return the result of the LoadBalancerRequest callback on the selected
* ServiceInstance
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* Create a proper URI with a real host and port for systems to utilize.
* Some systems use a URI with the logical serivce name as the host,
* such as http://myservice/path/to/service. This will replace the
* service name with the host:port from the ServiceInstance.
* @param instance
* @param original a URI with the host as a logical service name
* @return a reconstructed URI
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
LoadBalancerClient是一个接口,里面有三个方法。
第一个,ServiceInstance choose(String serviceId);从方法名上就可以看出,是根据传入的serviceId(服务名),从负载均衡器中选择一个服务实例,服务实例通过ServiceInstance类来表示。
第二个,execute方法,使用从负载均衡器中选择的服务实例来执行请求内容。
第三个,URI reconstructURI(ServiceInstance instance, URI original);方法,是重新构建一个URI的,还记得我们在代码中,通过RestTemplate请求服务时,写的是服务名吧,这个方法就会把这个请求的URI进行转换,返回host+port,通过host+port的形式去请求服务。
从工程中搜索LoadBalancerClient接口的实现类,可以找到RibbonLoadBalancerClient类实现了这个接口,并且实现了接口中定义的方法。
再梳理一下逻辑,我们在RestTemplate上添加了@LoadBalanced注解,RibbonLoadBalancerClient就会配置到这个RestTemplate实例上。
在LoadBalancerClient接口的同一个包路径下,还会看到另一个LoadBalancerAutoConfiguration类,看名字就感觉这是一个自动配置LoadBalancer的,进去这个类看一下。
/**
* Auto configuration for Ribbon (client side load balancing).
*
* @author Spencer Gibb
* @author Dave Syer
*/
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
}
};
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerInterceptor(loadBalancerClient);
}
}
注释中说明这个类是为Ribbon做自动配置的,类上的@Configuration说明这是一个配置类,在当前项目中存在RestTemplate类、并且存在LoadBalancerClient接口的实现类时,就满足了自动化配置的条件。
在LoadBalancerAutoConfiguration类中,创建了一个LoadBalancerInterceptor拦截器,还维护了一个被@LoadBalanced修饰的RestTemplate列表,在初始化的时候,会为每个restTemplate实例添加LoadBalancerInterceptor拦截器。
我们自己实现的项目,就定义了RestTemplate的一个对象,并且引入了spring-cloud相关的包,存在RibbonLoadBalancerClient作为LoadBalancerClient的实现类,所以,满足自动化配置的条件。接下来就看下,在restTemplate实例添加的LoadBalancerInterceptor拦截器的逻辑。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ClientHttpResponse>() {
@Override
public ClientHttpResponse apply(final ServiceInstance instance)
throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance);
return execution.execute(serviceRequest, body);
}
});
}
private class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance) {
super(request);
this.instance = instance;
}
@Override
public URI getURI() {
URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
this.instance, getRequest().getURI());
return uri;
}
}
}
由于在自动配置类中,对restTemplate实例添加了LoadBalancerInterceptor拦截器,所以,当用restTemplate发送http请求时,就会执行这个拦截器的intercept方法。
intercept方法中,会根据request.getURI(),获取请求的uri,再获取host,我们在发送http请求的时候,是用的服务名作为host,所以,这里就会拿到服务名,再调用具体LoadBalancerClient实例的execute方法,发送请求。
LoadBalancerClient的实现类为RibbonLoadBalancerClient,最终的负载均衡请求由它来执行,所以,还需要再梳理下RibbonLoadBalancerClient的逻辑。
先看下RibbonLoadBalancerClient中的execute方法:
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(ribbonServer);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
服务名作为serviceId字段传进来,先通过getLoadBalancer获取loadBalancer,再根据loadBalancer获取server,下面是getServer的代码:
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
如果loadBalancer为空,就直接返回空,否则就调用loadBalancer的chooseServer方法,获取相应的server。
看一下ILoadBalancer是一个接口,里面声明了一系列负载均衡实现的方法:
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
这里面还有一个getServerList方法,不过已经标记为Deprecated,所以就没有列出。
这些方法名比较直观,很容易就能猜出是干啥的,addServers是用来添加一个server集合,chooseServer是选择一个server,markServerDown用来标记某个服务下线,getReachableServers获取可用的Server集合,getAllServers是获取所有的server集合。
ILoadBalancer有很多实现,那具体是用的哪个类呢,可以通过RibbonClientConfiguration类看到,这个配置类在初始化的时候,返回了ZoneAwareLoadBalancer作为负载均衡器。
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping) {
ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
.withClientConfig(config).withRule(rule).withPing(ping)
.withServerListFilter(serverListFilter).withDynamicServerList(serverList)
.buildDynamicServerListLoadBalancer();
return balancer;
}
ZoneAwareLoadBalancer从名字中可以看出来,这个负载均衡器和zone是有关系的。下面看下ZoneAwareLoadBalancer中的chooseServer方法:
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Throwable e) {
logger.error("Unexpected exception when choosing server using zone aware logic", e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
这个方法会根据server的zone和可用性来选择具体的实例,返回一个Server对象。
通过ZoneAwareLoadBalancer选择具体的Server之后,再包装成RibbonServer对象,之前返回的server是该对象中的一个字段,除此之外,还有服务名serviceId,是否需要使用https等信息。最后,通过LoadBalancerRequest的apply方法,向具体的server发请求,从而实现了负载均衡。
下面是apply方法的定义:
public interface LoadBalancerRequest<T> {
public T apply(ServiceInstance instance) throws Exception;
}
在请求时,传入的ribbonServer对象,被当成ServiceInstance类型的对象进行接收。ServiceInstance是一个接口,定义了服务治理系统中,每个实例需要提供的信息,比如serviceId,host,port等。
LoadBalancerRequest是一个接口,最终会通过实现类的apply方法去执行,实现类是在LoadBalancerInterceptor中调用RibbonLoadBalancerClient的execute方法时,传进来的一个匿名类,可以通过查看LoadBalancerInterceptor的代码看到。
创建LoadBalancerRequest匿名类的时候,就重写了apply方法,apply方法中,还新建了一个ServiceRequestWrapper的内部类,这个类中,就重写了getURI方法,getURI方法会调用loadBalancer的reconstructURI方法来构建uri。
看到这里,已经可以大体知道Ribbon实现负载均衡的流程了,我们在RestTemplate上添加注解,就会有LoadBalancerClient的对象来配置它,也就是RibbonLoadBalancerClient。同时,LoadBalancerAutoConfiguration会进行配置,创建一个LoadBalancerInterceptor,并且拿到我们声明的所有restTemplate,在这些restTemplate中添加LoadBalancerInterceptor拦截器。
当通过restTemplate发送请求时,就会经过这个拦截器,在拦截器中,就会调用RibbonLoadBalancerClient中的方法,获取到根据服务名,通过负载均衡方法获取到服务实例,然后去请求这个实例。
上面说的这些,是如何对请求进行负载均衡的,但是还有个问题,我们请求的实例,是从Eureka Server上获取到的,那这个实例列表是如何获取的呢?怎么保证这个实例列表中的实例是可用的呢?
在RibbonLoadBalancerClient选择实例的时候,是通过ILoadBalancer的实现类根据负载均衡算法选择服务实例的,也就是ZoneAwareLoadBalancer的chooseServer中的逻辑,那就在这里找线索。查看ZoneAwareLoadBalancer的继承关系,可以看到如下图所示。
可以看到,最上面是ILoadBalancer接口,AbstractLoadBalancer类继承了这个接口,BaseLoadBalancer继承了AbstractLoadBalancer类,DynamicServerListLoadBalancer继承了BaseLoadBalancer,ZoneAwareLoadBalancer继承了DynamicServerListLoadBalancer。
ILoadBalancer接口的代码已经看过了,现在看下AbstractLoadBalancer的代码:
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
/**
* delegate to {@link #chooseServer(Object)} with parameter null.
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* List of servers that this Loadbalancer knows about
*
* @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* Obtain LoadBalancer related Statistics
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
这是一个抽象类,里面加了一个枚举,增加了两个抽象方法。定义的chooseServer方法。
下面再看BaseLoadBalancer类,BaseLoadBalancer类就算是负载均衡器的一个基础实现类,在里面可以看到定义了两个list:
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
从名字上看,这就是维护所有服务的实例列表,和维护状态为up的实例列表。
而且还可以看到BaseLoadBalancer中实现的ILoadBalancer接口中的方法,比如下面这两个,获取可用的服务列表,就会把upServerList返回,获取所有的服务列表,就会把allServerList返回。
@Override
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
接下来,再看DynamicServerListLoadBalancer类。从类头上的注释可以知道,这个类可以动态的获取服务列表,并且利用filter对服务列表进行过滤。
在DynamicServerListLoadBalancer类中,能看到定义了一个ServerList类型的serverListImpl字段,ServerList是一个接口,里面有两个方法:
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}
getInitialListOfServers是获取初始化的服务列表。
getUpdatedListOfServers是获取更新的服务列表。
ServerList有多个实现类,具体用的哪个呢,可以在EurekaRibbonClientConfiguration类中找到,这是Ribbon和Eureka结合的自动配置类,这里面有个方法:
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config) {
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
方法中先新建了一个DiscoveryEnabledNIWSServerList类型的对象,又把这个对象作为一个参数,创建了DomainExtractingServerList类型的对象,最终返回的是DomainExtractingServerList的实例对象。
查看DomainExtractingServerList中重写的这两个方法,发现还是调用的DiscoveryEnabledNIWSServerList中的方法。然后,进到DiscoveryEnabledNIWSServerList类中,看这两个方法的定义:
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
这两个方法,都是调用了obtainServersViaDiscovery这个方法:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
在这个方法中,就是通过eurekaClient去注册中心获取服务,将状态为up的服务实例封装成DiscoveryEnabledServer对象,然后放入列表返回,这就是获取服务列表的流程。
获取服务列表的流程知道了,那是如何触发去获取,如何更新服务列表的呢?再看DynamicServerListLoadBalancer类中的代码,有一段:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
ServerListUpdater是一个接口,在DynamicServerListLoadBalancer的构造函数中,创建了一个PollingServerListUpdater类的对象,为ServerListUpdater字段赋值。进入PollingServerListUpdater类看一下:
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
里面有个start方法,实现了Runnable接口,run方法里调用UpdateAction的doUpdate,之后再启动一个定时任务,执行这个方法。定时任务传入的两个时间参数:initialDelayMs和refreshIntervalMs,任务启动后一秒开始执行,并且每隔三十秒执行一次,用于刷新列表。
看到这里,就可以大体了解了,构造DynamicServerListLoadBalancer实例的时候,就会启动一个定时任务了,一开始先获取服务列表,之后每隔三十秒获取一次。负载均衡时,就是通过负载均衡算法在实例列表中选择一个,发送请求。
这就是Ribbon源码大体的流程,还有很多细节就不展开了,有兴趣的同学可以再仔细研究下。
参考资料:
1.《Spring Cloud与Docker微服务架构实战》 周立 著
2.《Spring Cloud微服务实战》 翟永超 著
3.《深入理解Spring Cloud与微服务构建》 方志朋 著
下一篇: 并查集--More is better
推荐阅读
-
spring cloud 之 客户端负载均衡Ribbon深入理解
-
关于Spring启动时Context加载源码分析
-
spring cloud 之 客户端负载均衡Ribbon深入理解
-
详解spring cloud中使用Ribbon实现客户端的软负载均衡
-
Spring Cloud Ribbon实现客户端负载均衡的方法
-
关于Spring启动时Context加载源码分析
-
Spring源码分析——调试环境搭建(可能是最省事的构建方法)
-
详解spring cloud中使用Ribbon实现客户端的软负载均衡
-
spring-session简介及实现原理源码分析
-
详解Spring cloud使用Ribbon进行Restful请求