Ribbon源码分析
Ribbon源码分析
@Qualifier 注解
在分析Ribbon源码之前,需要先了解@Qualifier注解的作用
@Qualifier在这里就相当于起到一个标记的作用
测试伪代码
- 创建TestClass类
public class TestClass {
private String name;
public TestClass(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "TestClass{" +
"name='" + name + '\'' +
'}';
}
}
- 定义TestConfigure配置类,并且在注入@Bean的同时,使用@Qualifier标记
@Configuration
public class TestConfigure {
@Qualifier
@Bean("testClass1")
public TestClass test1(){
return new TestClass ("testClass1");
}
@Qualifier
@Bean("testClass2")
public TestClass test2(){
return new TestClass ("testClass2");
}
}
- 定义TestController测试类
/**
* 测试添加了@Qualifier注解的类自动会装配到testClassList
*/
@RestController
public class TestController {
@Qualifier // spring会自动扫描添加了@Qualifier注解的TestClass的类,装配到testClassList中
@Autowired
private List<TestClass> testClassList = Collections.emptyList();
@GetMapping("/testQualifier")
public String getQualifier(){
return testClassList.toString ();
}
}
- 测试结果,发现我们刚刚定义的testClass1和testClass2都装配到testClassList中。
源码分析
@RestController
public class UserController {
@Autowired
private RestTemplate restTemplate;
/**
* 注入RestTemplate
* @param restTemplateBuilder
* @return
*/
@Bean
@LoadBalanced //添加此注解就可以针对RestTemplate在请求getForObject()方法中进行拦截,实现客户端的负载均衡
public RestTemplate restTemplate(RestTemplateBuilder restTemplateBuilder){
return restTemplateBuilder.build();
}
@GetMapping("/orders")
public String getOrders(){
// 调用订单的服务获得订单信息
// HttpClient / RestTemplate /OkHttp /JDK HttpUrlConnection (默认的几种远程连接的客户端)
/* ServiceInstance serviceInstance=loadBalancerClient.choose("spring-cloud-order-service");
String url=String.format("http://%s:%s",serviceInstance.getHost(),serviceInstance.getPort()+"/orders");*/
return restTemplate.getForObject("http://spring-cloud-order-service/orders",String.class);
}
}
LoadBalancerAutoConfiguration
在springBoot自动装配中,@LoadBalanced注解由LoadBalancerAutoConfiguration配置类扫描到
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
}
在配置类中,我们也发现有@LoadBalanced注解的存在,点进去查看@LoadBalanced注解,其实他就是使用@Qualifier注解标记的,它会将所有加了@Qualifier注解的 RestTemplate类装配到restTemplates中,从而类使用请求的拦截。
@LoadBalanced
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
LoadBalancerInterceptor / RestTemplateCustomizer
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
public interface RestTemplateCustomizer {
void customize(RestTemplate restTemplate);
}
在LoadBalancerAutoConfiguration中声明了LoadBalancerInterceptor这个Bean,并且在装载RestTemplateCustomizer过程中传递了LoadBalancerInterceptor,其作用是对修饰了@LoadBalanced注解的RestTemplate实例设置了一个LoadBalancerInterceptor拦截器。
SmartInitializingSingleton
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
遍历所有的加了@LoadBalance注解的RestTemplate,并且其进行包装。
LoadBalancerInterceptor.intercept()
当调用restTemplate进行请求的时候,由于在初始化阶段添加了LoadBalancerInterceptor拦截器,
restTemplate.getForObject("http://spring-cloud-order-service/orders",String.class);
所以会进入LoadBalancerInterceptor.intercept()这个方法中,主要实现了对于请求的拦截。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
//获取请求的url
final URI originalUri = request.getURI();
//获得服务名称
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
//交给LoadBalancerClient去执行这个请求
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}
LoadBalancerClient
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
LoadBalancerClient其实是一个接口,我们看一下它的类图,它有两个具体的实现。
此时,LoadBalancerClient的具体实例应该是RibbonLoadBalancerClient,这个对象实例是在 RibbonAutoConfiguration这个类中进行注入的。
RibbonLoadBalancerClient
RibbonLoadBalancerClient这个类的代码比较长,我们主要看一下他的核心方法execute,
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//根据serviceId获得一个ILoadBalancer
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//调用getServer方法去获取一个服务实例
Server server = getServer(loadBalancer, hint);
//判断Server的值是否为空。这里的Server实际上就是传统的一个服务节点,这个对象存储了服务节点的一些元数据,比如host、port等
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(
name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
}
getLoadBalancer
getLoadBalancer根据服务名称获取具体的LoadBalancer,这里的LoadBalancer实现是ZoneAwareLoadBalancer
getServer
getServer是用来获得一个具体的服务节点,它的实现如下
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
通过代码可以看到,getServer实际调用了IloadBalancer.chooseServer这个方法,ILoadBalancer这个 是一个负载均衡器接口。
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getServerList(boolean availableOnly);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
1.addServers表示向负载均衡器中维护的实例列表增加服务实例
2.chooseServer表示通过某种策略,从负载均衡服务器中挑选出一个具体的服务实例
3.markServerDown表示用来通知和标识负载均衡器中某个具体实例已经停止服务,否则负载均衡 器在下一次获取服务实例清单前都会认为这个服务实例是正常工作的
4.getReachableServers表示获取当前正常工作的服务实例列表
5.getAllServers表示获取所有的服务实例列表,包括正常的服务和停止工作的服务
从整个类的关系图来看,BaseLoadBalancer类实现了基础的负载均衡,而 DynamicServerListLoadBalancer和ZoneAwareLoadBalancer则是在负载均衡策略的基础上做了一些 功能扩展。
- AbstractLoadBalancer实现了ILoadBalancer接口,它定义了服务分组的枚举 类/chooseServer(用来选取一个服务实例)/getServerList(获取某一个分组中的所有服务实 例)/getLoadBalancerStats用来获得一个LoadBalancerStats对象,这个对象保存了每一个服务 的状态信息。
- BaseLoadBalancer,它实现了作为负载均衡器的基本功能,比如服务列表维护、服务存活状态监 测、负载均衡算法选择Server等。但是它只是完成基本功能,在有些复杂场景中还无法实现,比如 动态服务列表、Server过滤、Zone区域意识(服务之间的调用希望尽可能是在同一个区域内进 行,减少延迟)。
- DynamicServerListLoadBalancer是BaseLoadbalancer的一个子类,它对基础负载均衡提供了扩 展,从名字上可以看出,它提供了动态服务列表的特性 ZoneAwareLoadBalancer 它是在DynamicServerListLoadBalancer的基础上,增加了以Zone的 形式来配置多个LoadBalancer的功能。
那在getServer方法中, loadBalancer.chooseServer 具体的实现类是哪一个呢?我们找到 RibbonClientConfiguration这个类~~~
@Configuration(
proxyBeanMethods = false
)
@EnableConfigurationProperties
@Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}
}
在这个bean的声明中可以看到,默认情况下采用的是ZoneAwareLoadBalancer。
ZoneAwareLoadBalancer
Zone表示区域的意思,区域指的就是地理区域的概念,一般较大规模的互联网公司,都会做跨区域部 署,这样做有几个好处,第一个是为不同地域的用户提供近的访问节点减少访问延迟,其次是为了保 证高可用,做容灾处理。
而ZoneAwareLoadBalancer就是提供了具备区域意识的负载均衡器,它的主要作用是对Zone进行了感 知,保证每个Zone里面的负载均衡策略都是隔离的,它并不保证A区域过来的请求一定会发动到A区域 对应的Server内。真正实现这个需求的是 ZonePreferenceServerListFilter/ZoneAffinityServerListFilter 。
ZoneAwareLoadBalancer的核心功能是
- 若开启了区域意识,且zone的个数 > 1,就继续区域选择逻辑
- 根据ZoneAvoidanceRule.getAvailableZones()方法拿到可用区们(会T除掉完全不可用的区域 们,以及可用但是负载高的一个区域)
- 从可用区zone们中,通过ZoneAvoidanceRule.randomChooseZone随机选一个zone出来 (该随 机遵从权重规则:谁的zone里面Server数量多,被选中的概率越大)
- 在选中的zone里面的所有Server中,采用该zone对对应的Rule,进行choose
@Override
public Server chooseServer(Object key) {
//ENABLED,表示是否用区域意识的choose选择Server,默认是true,
//如果禁用了区域、或者只有一个zone,就直接按照父类的逻辑来进行处理,父类默认采用轮询算法
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//根据相关阈值计算可用区域
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//从可用区域中随机选择一个区域,zone里面的服务器节点越多,被选中的概率越大
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//根据zone获得该zone中的LB,然后根据该Zone的负载均衡算法选择一个server
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
BaseLoadBalancer.chooseServer
根据默认的负载均衡算法来获得指定的服务节点。默认的算法是RoundBin。
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
//选择一种负载均衡的算法
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}
这里的rule是Ribbon中负载均衡的规则,默认使用轮询的负载均衡规则
IRule(默认采用轮询的负载均衡规则)
自定义Rule轮询规则
自定义轮询规则
public class GxDefineIpHashRule extends AbstractLoadBalancerRule{
@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
}
public Server choose(ILoadBalancer lb,Object key){
if (lb == null) {
return null;
}
Server server = null;
while(server==null){
List<Server> allList=lb.getAllServers();
System.out.println(allList);
int index=0;
server=allList.get(index);
}
return server;
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(),key);
}
}
服务列表的加载过程
ZoneAwareLoadBalancer.chooseServer()
public Server chooseServer(Object key) {
Server server = null;
try {
........
if (server != null) {
return server;
} else {
//调用父类的BaseLoadBalancer的chooseServer()
return super.chooseServer(key);
}
}
}
BaseLoadBalancer.chooseServer()
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
//根据负载均衡的算法rule,选择一种负载均衡算法,这么我们看默认的轮询算法 RoundRobinRule
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
这里我们看默认的轮询算法 RoundRobinRule
RoundRobinRule.choose()
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
//根据LoadBalancer获取所有的服务列表
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//然后根据服务列表的大学递增获取下一个服务的Index
int nextServerIndex = incrementAndGetModulo(serverCount);
//根据Index获取服务的地址
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
@Deprecated
public List<Server> getServerList(boolean availableOnly);
public List<Server> getReachableServers();
//获取所有的服务列表
public List<Server> getAllServers();
}
lb.getAllServers()
这里的服务地址列表保存在哪里,又是在哪里初始化的呢?
BaseLoadBalancer.getAllServers()
发现在BaseLoadBalancer中有两个成员变量 allServerList,upServerList,一个用来存储所有的服务地址的列表,另一个存储更新的服务地址列表
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
//存储所有的服务地址的列表
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
//存储更新的服务地址列表
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
}
但是在BaseLoadBalancer中没有看到初始化allServerList,查看其子类 DynamicServerListLoadBalancer的构造方法中有一个restOfInit(clientConfig)方法
DynamicServerListLoadBalancer构造方法的restOfInit(clientConfig)
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//开启一个定时任务,每隔30s更新一次服务地址的列表
enableAndInitLearnNewServersFeature();
//第一次获取所有的服务的地址列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
updateListOfServers()
第一次启动的时候获取所有Eureka实例列表
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
enableAndInitLearnNewServersFeature()
开启一个定时任务,每隔30s定时更新一次服务列表
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
//更新服务列表地址
updateListOfServers();
}
};
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//调用上面的doUpdate更新
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//initialDelayMs默认启动1s后执行,refreshIntervalMs默认每隔30s刷新一次
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
updateListOfServers()
更新服务地址的列表
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
serverListImpl.getUpdatedListOfServers()
从本地配置/Euraka注册中心上动态获取地址列表
ConfigurationBasedServerList.getInitialListOfServers() 从本地配置获取
public class ConfigurationBasedServerList extends AbstractServerList<Server> {
private IClientConfig clientConfig;
@Override
public List<Server> getInitialListOfServers() {
return getUpdatedListOfServers();
}
@Override
public List<Server> getUpdatedListOfServers() {
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
}
clientConfig.get(CommonClientConfigKey.ListOfServers);
public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> {
public static final IClientConfigKey<String> ListOfServers = new CommonClientConfigKey<String>("listOfServers") {
};
}
和我们在application.properties中配置的key一样
# 配置指定服务的提供者的地址列表
spring-cloud-order-service.ribbon.listOfServers=\
localhost:8080,localhost:8082
DiscoveryEnabledNIWSServerList.getInitialListOfServers()从Eureka注册中心上获取服务地址列表
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
updateAllServerList(servers)
拿到服务列表后进行更新
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
//重新设置服务地址列表
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
setServersList(ls);
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
super.setServersList(lsrv);
因为我们的服务地址列表保存在父类 BaseLoadBalancer中,调用super.setServersList()更新服务列表到allServerList和upServerList
public void setServersList(List lsrv) {
......
ArrayList<Server> newServers = new ArrayList<Server>();
writeLock.lock();
try {
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server == null) {
continue;
}
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId());
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
boolean listChanged = false;
if (!allServerList.equals(allServers)) {
listChanged = true;
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
l.serverListChanged(oldList, newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
}
}
}
}
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
//重新赋值服务地址列表allServerList
allServerList = allServers;
if (canSkipPing()) {
for (Server s : allServerList) {
s.setAlive(true);
}
//重新赋值upServerList
upServerList = allServerList;
} else if (listChanged) {
forceQuickPing();
}
} finally {
writeLock.unlock();
}
}
url服务地址的替换
使用RestTemplate.getForObject()远程调用的url是: http://spring-cloud-order-service/orders
@GetMapping("/user/{id}")
public String findById(@PathVariable("id")int id){
return restTemplate.getForObject("http://spring-cloud-order-service/orders",String.class);
}
通过Ribbon的负载均衡算法之后会得到一个server,服务提供者的ip,port等信息,接下来看一下如何替换url
RibbonLoadBalancerClient.execute()
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//根据serverName获取LoadBalancer负载均衡器
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//根据负载均衡器获取指定的服务提供者的Server,包括ip,port
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
//拿到Server之后,就可以替换url了
return execute(serviceId, ribbonServer, request);
}
在调用另外一个execute重载方法,在这个方法中终会调用apply方法,这个方法会向一个具体的实例 发送请求。
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
//这里Request是一个Lambda表达式,这里的request实例为ServiceRequestWrapper
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
request.apply
request是LoadBalancerRequest接口,它里面提供了一个apply方法,但是从代码中我们发现这个方法 并没有实现类,那么它是在哪里实现的呢?
继续又往前分析发现,这个request对象是从LoadBalancerInterceptor的intercept方法中传递过来的.
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
而request的传递,是通过 this.requestFactory.createRequest(request, body, execution) 创 建二来,于是我们找到这个方法。
public class LoadBalancerRequestFactory {
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
//HttpRequest实例是ServiceRequestWrapper
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}
}
从代码中发现,它是一个用lambda表达式实现的匿名内部类。在该内部类中,创建了一个 ServiceRequestWrapper,这个ServiceRequestWrapper实际上就是HttpRequestWrapper的一个子 类,ServiceRequestWrapper重写了HttpRequestWrapper的getURI()方法,重写的URI实际上就是通 过调用LoadBalancerClient接口的reconstructURI函数来重新构建一个URI进行访问
AsyncLoadBalancerInterceptor
public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
}
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
//异步的apply()方法获取到实例为ServiceRequestWrapper
return (ListenableFuture)this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
}
InterceptingAsyncClientHttpRequest.executeAsync()
发起异步远程调用
public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
AsyncClientHttpRequestInterceptor interceptor = (AsyncClientHttpRequestInterceptor)this.iterator.next();
return interceptor.intercept(request, body, this);
} else {
URI uri = request.getURI();
HttpMethod method = request.getMethod();
HttpHeaders headers = request.getHeaders();
Assert.state(method != null, "No standard HTTP method");
AsyncClientHttpRequest delegate = InterceptingAsyncClientHttpRequest.this.requestFactory.createAsyncRequest(uri, method);
delegate.getHeaders().putAll(headers);
if (body.length > 0) {
StreamUtils.copy(body, delegate.getBody());
}
return delegate.executeAsync();
}
}
ServiceRequestWrapper.getURI() 重构Url
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
RibbonLoadBalancerClient.reconstructURI()
reconstructURI这个方法,实际上是重构URI,也就是把一个 http://服务名/转化为 http://地址/ 的 过程。
- 首先获得一个serviceId
- 根据serviceId获得一个RibbonLoadBalancerContext对象,这个是用来存储一些被负载均衡器使 用的上下文内容。
- 调用reconstructURIWithServer方法来构建服务实例的URI
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
}
else {
server = new Server(instance.getScheme(), instance.getHost(),
instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}
LoadBalancerContext.reconstructURIWithServer()
reconstructURIWithServer的实现逻辑比较好理解,首先从Server中获得host和port信息。然后将原始 的以服务名为host的uri替换为目标服务器的地址。
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server.getPort();
String scheme = server.getScheme();
if (host.equals(original.getHost())
&& port == original.getPort()
&& scheme == original.getScheme()) {
return original;
}
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
Ping 检测服务列表是否正常运行(默认情况下不会验证)
DynamicServerListLoadBalancer的构造方法
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
//调用父类的构造方法
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
super(clientConfig, rule, ping);
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
//设置Ping的间隔
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);
setLoadBalancerStats(stats);
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
boolean enablePrimeConnections = clientConfig.get(
CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
if (enablePrimeConnections) {
this.setEnablePrimingConnections(true);
PrimeConnections primeConnections = new PrimeConnections(
this.getName(), clientConfig);
this.setPrimeConnections(primeConnections);
}
init();
}
setPingInterval
public void setPingInterval(int pingIntervalSeconds) {
if (pingIntervalSeconds < 1) {
return;
}
this.pingIntervalSeconds = pingIntervalSeconds;
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancer [{}]: pingIntervalSeconds set to {}",
name, this.pingIntervalSeconds);
}
//开启Ping的定时任务
setupPingTask(); // since ping data changed
}
setupPingTask()
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
protected Timer lbTimer = null;
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
//开启定时任务,每10s去检测一次服务列表是否正常运行
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
}
PingTask.run()
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
new Pinger(pingStrategy).runPinger();
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
//获取读写锁
allLock.lock();
//获取服务地址列表
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
pingServers()
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
// NOTE: IFF we were doing a real ping
// assuming we had a large set of servers (say 15)
// the logic below will run them serially
// hence taking 15 times the amount of time it takes
// to ping each server
// A better method would be to put this in an executor
// pool
// But, at the time of this writing, we dont REALLY
// use a Real Ping (its mostly in memory eureka call)
// hence we can afford to simplify this design and run
// this
// serially
if (ping != null) {
//for循环遍历ping这个节点是否存活
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
PingUrl.isAlive()
相当于是心跳机制,每隔一段时间发起一次请求,判断返回的Code==200,如果是200说明还存活着,反之。
public boolean isAlive(Server server) {
String urlStr = "";
if (this.isSecure) {
urlStr = "https://";
} else {
urlStr = "http://";
}
urlStr = urlStr + server.getId();
urlStr = urlStr + this.getPingAppendString();
boolean isAlive = false;
HttpClient httpClient = new DefaultHttpClient();
HttpUriRequest getRequest = new HttpGet(urlStr);
String content = null;
try {
HttpResponse response = httpClient.execute(getRequest);
content = EntityUtils.toString(response.getEntity());
isAlive = response.getStatusLine().getStatusCode() == 200;
if (this.getExpectedContent() != null) {
LOGGER.debug("content:" + content);
if (content == null) {
isAlive = false;
} else if (content.equals(this.getExpectedContent())) {
isAlive = true;
} else {
isAlive = false;
}
}
} catch (IOException var11) {
var11.printStackTrace();
} finally {
getRequest.abort();
}
return isAlive;
}
自定义实现Ping
添加自定义PING的代码,实现IPing接口,重新isAlive()方法
public class MyPing implements IPing{
@Override
public boolean isAlive(Server server) {
System.out.println("isAlive"+server.getHost()+":"+server.getPort());
return true;
}
}
修改配置(注意,以下配置只需要关心倒数第一个和第二个即可)
#配置服务器列表
MyRibbonClient.ribbon.listOfServers=localhost:8080,localhost:8002
# 开启okHttp的HTTP客户端
ribbon.okhttp.enabled=true
# 关闭HttpCliet的Http客户端,默认采用的jdk HttpUrlConnection
ribbon.http.client.enabled=false
#配置负载均衡规则IRule的实现类
MyRibbonClient.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.WeightedResponseTimeRule
#配置负载均衡实现类
MyRibbonClient.ribbon.NFLoadBalancerClassName=com.netflix.loadbalancer.ZoneAwareLoadBalancer
#配置IPing的实现类
MyRibbonClient.ribbon.NFLoadBalancerPingClassName=org.lixue.ribbon.client.MyPing
#配置Ping操作的间隔
MyRibbonClient.ribbon.NFLoadBalancerPingInterval=2
可以参考spring官网,地址:spring-cloud-Netflix-Ribbon
图解
下一篇: HDU 1272 小希的迷宫(并查集)