通过源码简单查看FeignClient如何实现服务间请求
FeignClient实现服务间请求
1、EnableFeignClients
通过@Import
注解导入FeignClientsRegistrar
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
//...
}
2、FeignClientsRegistrar
FeignClientsRegistrar
实现了ImportBeanDefinitionRegistrar
接口,实现了registerBeanDefinitions
方法,通过FeignClientFactoryBean
注入
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
registerDefaultConfiguration(metadata, registry);
registerFeignClients(metadata, registry);
}
public void registerFeignClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
//....
// 实现注入Feign客户端的方法
registerFeignClient(registry, annotationMetadata, attributes);
//...
}
// 这个方法注入的是Feign的工厂类 FeignClientFactoryBean
// FactoryBean的作用是返回getObject方法返回的对象,Bean可以通过实现该接口,实现定制化的逻辑
private void registerFeignClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
BeanDefinitionBuilder definition = BeanDefinitionBuilder
.genericBeanDefinition(FeignClientFactoryBean.class);
validate(attributes);
definition.addPropertyValue("url", getUrl(attributes));
definition.addPropertyValue("path", getPath(attributes));
String name = getName(attributes);
definition.addPropertyValue("name", name);
definition.addPropertyValue("type", className);
definition.addPropertyValue("decode404", attributes.get("decode404"));
definition.addPropertyValue("fallback", attributes.get("fallback"));
definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
String alias = name + "FeignClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null
beanDefinition.setPrimary(primary);
String qualifier = getQualifier(attributes);
if (StringUtils.hasText(qualifier)) {
alias = qualifier;
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
new String[] { alias });
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
3、FeignClientFactoryBean
3.1、查看getObject()
方法返回的实例,作为Bean
@Override
public Object getObject() throws Exception {
return getTarget();
}
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(this.url)) {
String url;
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}
3.2、查看loadBalance
方法
①、②通过ApplicationContext
获取Spring容器中的 Bean
protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
HardCodedTarget<T> target) {
// ① 默认的是 LoadBalancerFeignClient
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
// ② 默认的是 HystrixTargeter
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}
throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
}
4、HystrixTargeter
4.1、查看target
方法
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
SetterFactory setterFactory = getOptional(factory.getName(), context,
SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(factory.getName(), context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
}
return feign.target(target);
}
4.2、查看feign.target(target)
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
5、ReflectiveFeign
5.1查看newInstance
方法
① 生成代理方法
② 生成代理对象
@SuppressWarnings("unchecked")
@Override
public <T> T newInstance(Target<T> target) {
// ① 代理方法处理
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
② 生成代理对象
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);
for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
5.2、查看apply
方法 -> factory.create
返回了一个SynchronousMethodHandler
实例
public MethodHandler create(Target<?> target, MethodMetadata md,
RequestTemplate.Factory buildTemplateFromArgs,
Options options, Decoder decoder, ErrorDecoder errorDecoder) {
return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
logLevel, md, buildTemplateFromArgs, options, decoder,
errorDecoder, decode404, closeAfterDecode);
}
6、SynchronousMethodHandler
6.1、实现了invoke
方法,即是代理请求
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
这里虽然表明了在接口上通过@FeignClient 注解实现调用,但是如何通过微服务注册中心的注册名称来获取对应服务的IP、端口的列表还没有展示出来。
6.2、executeAndDecode
实现通过客户端调用,即是之前的LoadBalancerFeignClient
//...
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
//...
7、LoadBalancerFeignClient
7.1、查看execute
方法,通过调用executeWithLoadBalancer
执行
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
7.2、查看executeWithLoadBalancer
,这里使用了Reactor框架
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
7.3、查看submit
方法,后面的只是一个内部类对象参数,暂不关注
8、LoadBalancerCommand
8.1、submit
方法
//...
server == null ? selectServer() : Observable.just(server)
//...
这个server就是具体的ip和端口信息实例
8.2、查看selectServer
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
这里获取的Server信息是从已获取到的实例server列表中通过规则,默认是RoundRobin规则选择一个Server进行请求。
具体的获取到Server列表信息是ILoadBanlancer
的实现类ZoneAwareLoadBalancer
在FeignClient
加载到Spring容器中实现的。
9、ZoneAwareLoadBalancer
9.1、配置类
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
查看构造函数
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
// 到父类 DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
查看restOfInit
方法
void restOfInit(IClientConfig clientConfig) {
//...
updateListOfServers();
//...
}
查看updateListOfServers
方法,
是通过ServerList实现对象获取Server
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
查看之前依赖的serverList bean,实现有KubernetesServerList
(k8s ribbon),ConsulServerList
(consul ribbon)
KubernetesServerList
k8s中是通过endpoint实现获取
public List<Server> getUpdatedListOfServers() {
Endpoints endpoints = this.namespace != null
? this.client.endpoints().inNamespace(this.namespace)
.withName(this.serviceId).get()
: this.client.endpoints().withName(this.serviceId).get();
//...
}
ConsulServerList
通过ConsulClient获取
@Override
public List<ConsulServer> getUpdatedListOfServers() {
return getServers();
}
private List<ConsulServer> getServers() {
if (this.client == null) {
return Collections.emptyList();
}
String tag = getTag(); // null is ok
Response<List<HealthService>> response = this.client.getHealthServices(
this.serviceId, tag, this.properties.isQueryPassing(),
createQueryParamsForClientRequest(), this.properties.getAclToken());
if (response.getValue() == null || response.getValue().isEmpty()) {
return Collections.emptyList();
}
return transformResponse(response.getValue());
}
这里根据微服务注册名称获取具体的IP、端口信息。
因此@FeignClient
注解生效的原因是,对注解的接口生成一个代理类,对目的方法进行代理,通过微服务注册名称获取具体的ip和端口信息列表,通过负载均衡请求。### FeignClient实现服务间请求
1、EnableFeignClients
通过@Import
注解导入FeignClientsRegistrar
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
//...
}
2、FeignClientsRegistrar
FeignClientsRegistrar
实现了ImportBeanDefinitionRegistrar
接口,实现了registerBeanDefinitions
方法,通过FeignClientFactoryBean
注入
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
registerDefaultConfiguration(metadata, registry);
registerFeignClients(metadata, registry);
}
public void registerFeignClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
//....
// 实现注入Feign客户端的方法
registerFeignClient(registry, annotationMetadata, attributes);
//...
}
// 这个方法注入的是Feign的工厂类 FeignClientFactoryBean
// FactoryBean的作用是返回getObject方法返回的对象,Bean可以通过实现该接口,实现定制化的逻辑
private void registerFeignClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
BeanDefinitionBuilder definition = BeanDefinitionBuilder
.genericBeanDefinition(FeignClientFactoryBean.class);
validate(attributes);
definition.addPropertyValue("url", getUrl(attributes));
definition.addPropertyValue("path", getPath(attributes));
String name = getName(attributes);
definition.addPropertyValue("name", name);
definition.addPropertyValue("type", className);
definition.addPropertyValue("decode404", attributes.get("decode404"));
definition.addPropertyValue("fallback", attributes.get("fallback"));
definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
String alias = name + "FeignClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null
beanDefinition.setPrimary(primary);
String qualifier = getQualifier(attributes);
if (StringUtils.hasText(qualifier)) {
alias = qualifier;
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
new String[] { alias });
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
3、FeignClientFactoryBean
3.1、查看getObject()
方法返回的实例,作为Bean
@Override
public Object getObject() throws Exception {
return getTarget();
}
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(this.url)) {
String url;
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}
3.2、查看loadBalance
方法
①、②通过ApplicationContext
获取Spring容器中的 Bean
protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
HardCodedTarget<T> target) {
// ① 默认的是 LoadBalancerFeignClient
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
// ② 默认的是 HystrixTargeter
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}
throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
}
4、HystrixTargeter
4.1、查看target
方法
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
SetterFactory setterFactory = getOptional(factory.getName(), context,
SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(factory.getName(), context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
}
return feign.target(target);
}
4.2、查看feign.target(target)
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
5、ReflectiveFeign
5.1查看newInstance
方法
① 生成代理方法
② 生成代理对象
@SuppressWarnings("unchecked")
@Override
public <T> T newInstance(Target<T> target) {
// ① 代理方法处理
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
② 生成代理对象
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);
for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
5.2、查看apply
方法 -> factory.create
返回了一个SynchronousMethodHandler
实例
public MethodHandler create(Target<?> target, MethodMetadata md,
RequestTemplate.Factory buildTemplateFromArgs,
Options options, Decoder decoder, ErrorDecoder errorDecoder) {
return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
logLevel, md, buildTemplateFromArgs, options, decoder,
errorDecoder, decode404, closeAfterDecode);
}
6、SynchronousMethodHandler
6.1、实现了invoke
方法,即是代理请求
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
这里虽然表明了在接口上通过@FeignClient 注解实现调用,但是如何通过微服务注册中心的注册名称来获取对应服务的IP、端口的列表还没有展示出来。
6.2、executeAndDecode
实现通过客户端调用,即是之前的LoadBalancerFeignClient
//...
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
//...
7、LoadBalancerFeignClient
7.1、查看execute
方法,通过调用executeWithLoadBalancer
执行
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
7.2、查看executeWithLoadBalancer
,这里使用了Reactor框架
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
7.3、查看submit
方法,后面的只是一个内部类对象参数,暂不关注
8、LoadBalancerCommand
8.1、submit
方法
//...
server == null ? selectServer() : Observable.just(server)
//...
这个server就是具体的ip和端口信息实例
8.2、查看selectServer
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
这里获取的Server信息是从已获取到的实例server列表中通过规则,默认是RoundRobin规则选择一个Server进行请求。
具体的获取到Server列表信息是ILoadBanlancer
的实现类ZoneAwareLoadBalancer
在FeignClient
加载到Spring容器中实现的。
9、ZoneAwareLoadBalancer
9.1、配置类
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
查看构造函数
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
// 到父类 DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
查看restOfInit
方法
void restOfInit(IClientConfig clientConfig) {
//...
updateListOfServers();
//...
}
查看updateListOfServers
方法,
是通过ServerList实现对象获取Server
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
查看之前依赖的serverList bean,实现有KubernetesServerList
(k8s ribbon),ConsulServerList
(consul ribbon)
KubernetesServerList
k8s中是通过endpoint实现获取
public List<Server> getUpdatedListOfServers() {
Endpoints endpoints = this.namespace != null
? this.client.endpoints().inNamespace(this.namespace)
.withName(this.serviceId).get()
: this.client.endpoints().withName(this.serviceId).get();
//...
}
ConsulServerList
通过ConsulClient获取
@Override
public List<ConsulServer> getUpdatedListOfServers() {
return getServers();
}
private List<ConsulServer> getServers() {
if (this.client == null) {
return Collections.emptyList();
}
String tag = getTag(); // null is ok
Response<List<HealthService>> response = this.client.getHealthServices(
this.serviceId, tag, this.properties.isQueryPassing(),
createQueryParamsForClientRequest(), this.properties.getAclToken());
if (response.getValue() == null || response.getValue().isEmpty()) {
return Collections.emptyList();
}
return transformResponse(response.getValue());
}
这里根据微服务注册名称获取具体的IP、端口信息。
因此@FeignClient
注解生效的原因是,对注解的接口生成一个代理类,对目的方法进行代理,通过微服务注册名称获取具体的ip和端口信息列表,通过负载均衡请求。