欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

通过源码简单查看FeignClient如何实现服务间请求

程序员文章站 2024-02-04 21:06:58
...

FeignClient实现服务间请求

1、EnableFeignClients

通过@Import注解导入FeignClientsRegistrar

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
    //...
}

2、FeignClientsRegistrar

FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar接口,实现了registerBeanDefinitions 方法,通过FeignClientFactoryBean注入

public void registerBeanDefinitions(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
		registerDefaultConfiguration(metadata, registry);
		registerFeignClients(metadata, registry);
}

public void registerFeignClients(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
    //....
    // 实现注入Feign客户端的方法
    registerFeignClient(registry, annotationMetadata, attributes);
    //...
}

// 这个方法注入的是Feign的工厂类 FeignClientFactoryBean
// FactoryBean的作用是返回getObject方法返回的对象,Bean可以通过实现该接口,实现定制化的逻辑
private void registerFeignClient(BeanDefinitionRegistry registry,
			AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
    String className = annotationMetadata.getClassName();
    BeanDefinitionBuilder definition = BeanDefinitionBuilder
        .genericBeanDefinition(FeignClientFactoryBean.class);
    validate(attributes);
    definition.addPropertyValue("url", getUrl(attributes));
    definition.addPropertyValue("path", getPath(attributes));
    String name = getName(attributes);
    definition.addPropertyValue("name", name);
    definition.addPropertyValue("type", className);
    definition.addPropertyValue("decode404", attributes.get("decode404"));
    definition.addPropertyValue("fallback", attributes.get("fallback"));
    definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
    definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

    String alias = name + "FeignClient";
    AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();

    boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null

    beanDefinition.setPrimary(primary);

    String qualifier = getQualifier(attributes);
    if (StringUtils.hasText(qualifier)) {
        alias = qualifier;
    }

    BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                                                           new String[] { alias });
    BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

3、FeignClientFactoryBean

3.1、查看getObject()方法返回的实例,作为Bean

@Override
public Object getObject() throws Exception {
    return getTarget();
}

<T> T getTarget() {
    FeignContext context = applicationContext.getBean(FeignContext.class);
    Feign.Builder builder = feign(context);

    if (!StringUtils.hasText(this.url)) {
        String url;
        if (!this.name.startsWith("http")) {
            url = "http://" + this.name;
        }
        else {
            url = this.name;
        }
        url += cleanPath();
        return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
                                                                       this.name, url));
    }
    if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
        this.url = "http://" + this.url;
    }
    String url = this.url + cleanPath();
    Client client = getOptional(context, Client.class);
    if (client != null) {
        if (client instanceof LoadBalancerFeignClient) {
            // not load balancing because we have a url,
            // but ribbon is on the classpath, so unwrap
            client = ((LoadBalancerFeignClient)client).getDelegate();
        }
        builder.client(client);
    }
    Targeter targeter = get(context, Targeter.class);
    return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
        this.type, this.name, url));
}

3.2、查看loadBalance方法

①、②通过ApplicationContext获取Spring容器中的 Bean

protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
			HardCodedTarget<T> target) {
    // ① 默认的是 LoadBalancerFeignClient
    Client client = getOptional(context, Client.class);
    if (client != null) {
        builder.client(client);
        // ② 默认的是 HystrixTargeter
        Targeter targeter = get(context, Targeter.class);
        return targeter.target(this, builder, context, target);
    }

    throw new IllegalStateException(
        "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
}

4、HystrixTargeter

4.1、查看target方法

@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
                    Target.HardCodedTarget<T> target) {
    if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
        return feign.target(target);
    }
    feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
    SetterFactory setterFactory = getOptional(factory.getName(), context,
                                              SetterFactory.class);
    if (setterFactory != null) {
        builder.setterFactory(setterFactory);
    }
    Class<?> fallback = factory.getFallback();
    if (fallback != void.class) {
        return targetWithFallback(factory.getName(), context, target, builder, fallback);
    }
    Class<?> fallbackFactory = factory.getFallbackFactory();
    if (fallbackFactory != void.class) {
        return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
    }

    return feign.target(target);
}

4.2、查看feign.target(target)

public <T> T target(Target<T> target) {
    return build().newInstance(target);
}

public Feign build() {
    SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
        new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
                                             logLevel, decode404, closeAfterDecode);
    ParseHandlersByName handlersByName =
        new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
                                errorDecoder, synchronousMethodHandlerFactory);
    return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}

5、ReflectiveFeign

5.1查看newInstance方法

① 生成代理方法

② 生成代理对象

@SuppressWarnings("unchecked")
@Override
public <T> T newInstance(Target<T> target) {
    // ① 代理方法处理
    Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
    Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
    List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

    for (Method method : target.type().getMethods()) {
        if (method.getDeclaringClass() == Object.class) {
            continue;
        } else if(Util.isDefault(method)) {
            DefaultMethodHandler handler = new DefaultMethodHandler(method);
            defaultMethodHandlers.add(handler);
            methodToHandler.put(method, handler);
        } else {
            methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
        }
    }
    InvocationHandler handler = factory.create(target, methodToHandler);
    ② 生成代理对象
    T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);

    for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
        defaultMethodHandler.bindTo(proxy);
    }
    return proxy;
}

5.2、查看apply方法 -> factory.create

返回了一个SynchronousMethodHandler 实例

public MethodHandler create(Target<?> target, MethodMetadata md,
                                RequestTemplate.Factory buildTemplateFromArgs,
                                Options options, Decoder decoder, ErrorDecoder errorDecoder) {
    return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
                                        logLevel, md, buildTemplateFromArgs, options, decoder,
                                        errorDecoder, decode404, closeAfterDecode);
}

6、SynchronousMethodHandler

6.1、实现了invoke方法,即是代理请求

@Override
public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
        try {
            return executeAndDecode(template);
        } catch (RetryableException e) {
            retryer.continueOrPropagate(e);
            if (logLevel != Logger.Level.NONE) {
                logger.logRetry(metadata.configKey(), logLevel);
            }
            continue;
        }
    }
}

这里虽然表明了在接口上通过@FeignClient 注解实现调用,但是如何通过微服务注册中心的注册名称来获取对应服务的IP、端口的列表还没有展示出来。

6.2、executeAndDecode

实现通过客户端调用,即是之前的LoadBalancerFeignClient

//...
try {
    response = client.execute(request, options);
    // ensure the request is set. TODO: remove in Feign 10
    response.toBuilder().request(request).build();
} catch (IOException e) {
    if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
    }
    throw errorExecuting(request, e);
}
//...

7、LoadBalancerFeignClient

7.1、查看execute方法,通过调用executeWithLoadBalancer执行

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    try {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);

        IClientConfig requestConfig = getClientConfig(options, clientName);
        return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                                                            requestConfig).toResponse();
    }
    catch (ClientException e) {
        IOException io = findIOException(e);
        if (io != null) {
            throw io;
        }
        throw new RuntimeException(e);
    }
}

7.2、查看executeWithLoadBalancer,这里使用了Reactor框架

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }

}

7.3、查看submit方法,后面的只是一个内部类对象参数,暂不关注

8、LoadBalancerCommand

8.1、submit方法

//...
server == null ? selectServer() : Observable.just(server)
//...

这个server就是具体的ip和端口信息实例

8.2、查看selectServer

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

这里获取的Server信息是从已获取到的实例server列表中通过规则,默认是RoundRobin规则选择一个Server进行请求。

具体的获取到Server列表信息是ILoadBanlancer的实现类ZoneAwareLoadBalancerFeignClient加载到Spring容器中实现的。

9、ZoneAwareLoadBalancer

9.1、配置类

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

查看构造函数

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

// 到父类 DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
    }
    restOfInit(clientConfig);
}

查看restOfInit方法

void restOfInit(IClientConfig clientConfig) {
    //...
    updateListOfServers();
   //...
}

查看updateListOfServers方法,

是通过ServerList实现对象获取Server

@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
    servers = serverListImpl.getUpdatedListOfServers();
    LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);

    if (filter != null) {
    servers = filter.getFilteredListOfServers(servers);
    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);
    }
    }
    updateAllServerList(servers);
}

查看之前依赖的serverList bean,实现有KubernetesServerList (k8s ribbon),ConsulServerList(consul ribbon)

KubernetesServerList

k8s中是通过endpoint实现获取

public List<Server> getUpdatedListOfServers() {
Endpoints endpoints = this.namespace != null
				? this.client.endpoints().inNamespace(this.namespace)
						.withName(this.serviceId).get()
				: this.client.endpoints().withName(this.serviceId).get();
    //...
}

ConsulServerList

通过ConsulClient获取

@Override
public List<ConsulServer> getUpdatedListOfServers() {
    return getServers();
}
private List<ConsulServer> getServers() {
    if (this.client == null) {
        return Collections.emptyList();
    }
    String tag = getTag(); // null is ok
    Response<List<HealthService>> response = this.client.getHealthServices(
        this.serviceId, tag, this.properties.isQueryPassing(),
        createQueryParamsForClientRequest(), this.properties.getAclToken());
    if (response.getValue() == null || response.getValue().isEmpty()) {
        return Collections.emptyList();
    }
    return transformResponse(response.getValue());
}

这里根据微服务注册名称获取具体的IP、端口信息。

因此@FeignClient注解生效的原因是,对注解的接口生成一个代理类,对目的方法进行代理,通过微服务注册名称获取具体的ip和端口信息列表,通过负载均衡请求。### FeignClient实现服务间请求

1、EnableFeignClients

通过@Import注解导入FeignClientsRegistrar

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
    //...
}

2、FeignClientsRegistrar

FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar接口,实现了registerBeanDefinitions 方法,通过FeignClientFactoryBean注入

public void registerBeanDefinitions(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
		registerDefaultConfiguration(metadata, registry);
		registerFeignClients(metadata, registry);
}

public void registerFeignClients(AnnotationMetadata metadata,
			BeanDefinitionRegistry registry) {
    //....
    // 实现注入Feign客户端的方法
    registerFeignClient(registry, annotationMetadata, attributes);
    //...
}

// 这个方法注入的是Feign的工厂类 FeignClientFactoryBean
// FactoryBean的作用是返回getObject方法返回的对象,Bean可以通过实现该接口,实现定制化的逻辑
private void registerFeignClient(BeanDefinitionRegistry registry,
			AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
    String className = annotationMetadata.getClassName();
    BeanDefinitionBuilder definition = BeanDefinitionBuilder
        .genericBeanDefinition(FeignClientFactoryBean.class);
    validate(attributes);
    definition.addPropertyValue("url", getUrl(attributes));
    definition.addPropertyValue("path", getPath(attributes));
    String name = getName(attributes);
    definition.addPropertyValue("name", name);
    definition.addPropertyValue("type", className);
    definition.addPropertyValue("decode404", attributes.get("decode404"));
    definition.addPropertyValue("fallback", attributes.get("fallback"));
    definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
    definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

    String alias = name + "FeignClient";
    AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();

    boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null

    beanDefinition.setPrimary(primary);

    String qualifier = getQualifier(attributes);
    if (StringUtils.hasText(qualifier)) {
        alias = qualifier;
    }

    BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                                                           new String[] { alias });
    BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

3、FeignClientFactoryBean

3.1、查看getObject()方法返回的实例,作为Bean

@Override
public Object getObject() throws Exception {
    return getTarget();
}

<T> T getTarget() {
    FeignContext context = applicationContext.getBean(FeignContext.class);
    Feign.Builder builder = feign(context);

    if (!StringUtils.hasText(this.url)) {
        String url;
        if (!this.name.startsWith("http")) {
            url = "http://" + this.name;
        }
        else {
            url = this.name;
        }
        url += cleanPath();
        return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
                                                                       this.name, url));
    }
    if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
        this.url = "http://" + this.url;
    }
    String url = this.url + cleanPath();
    Client client = getOptional(context, Client.class);
    if (client != null) {
        if (client instanceof LoadBalancerFeignClient) {
            // not load balancing because we have a url,
            // but ribbon is on the classpath, so unwrap
            client = ((LoadBalancerFeignClient)client).getDelegate();
        }
        builder.client(client);
    }
    Targeter targeter = get(context, Targeter.class);
    return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
        this.type, this.name, url));
}

3.2、查看loadBalance方法

①、②通过ApplicationContext获取Spring容器中的 Bean

protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
			HardCodedTarget<T> target) {
    // ① 默认的是 LoadBalancerFeignClient
    Client client = getOptional(context, Client.class);
    if (client != null) {
        builder.client(client);
        // ② 默认的是 HystrixTargeter
        Targeter targeter = get(context, Targeter.class);
        return targeter.target(this, builder, context, target);
    }

    throw new IllegalStateException(
        "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
}

4、HystrixTargeter

4.1、查看target方法

@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
                    Target.HardCodedTarget<T> target) {
    if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
        return feign.target(target);
    }
    feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
    SetterFactory setterFactory = getOptional(factory.getName(), context,
                                              SetterFactory.class);
    if (setterFactory != null) {
        builder.setterFactory(setterFactory);
    }
    Class<?> fallback = factory.getFallback();
    if (fallback != void.class) {
        return targetWithFallback(factory.getName(), context, target, builder, fallback);
    }
    Class<?> fallbackFactory = factory.getFallbackFactory();
    if (fallbackFactory != void.class) {
        return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
    }

    return feign.target(target);
}

4.2、查看feign.target(target)

public <T> T target(Target<T> target) {
    return build().newInstance(target);
}

public Feign build() {
    SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
        new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
                                             logLevel, decode404, closeAfterDecode);
    ParseHandlersByName handlersByName =
        new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
                                errorDecoder, synchronousMethodHandlerFactory);
    return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}

5、ReflectiveFeign

5.1查看newInstance方法

① 生成代理方法

② 生成代理对象

@SuppressWarnings("unchecked")
@Override
public <T> T newInstance(Target<T> target) {
    // ① 代理方法处理
    Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
    Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
    List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

    for (Method method : target.type().getMethods()) {
        if (method.getDeclaringClass() == Object.class) {
            continue;
        } else if(Util.isDefault(method)) {
            DefaultMethodHandler handler = new DefaultMethodHandler(method);
            defaultMethodHandlers.add(handler);
            methodToHandler.put(method, handler);
        } else {
            methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
        }
    }
    InvocationHandler handler = factory.create(target, methodToHandler);
    ② 生成代理对象
    T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);

    for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
        defaultMethodHandler.bindTo(proxy);
    }
    return proxy;
}

5.2、查看apply方法 -> factory.create

返回了一个SynchronousMethodHandler 实例

public MethodHandler create(Target<?> target, MethodMetadata md,
                                RequestTemplate.Factory buildTemplateFromArgs,
                                Options options, Decoder decoder, ErrorDecoder errorDecoder) {
    return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
                                        logLevel, md, buildTemplateFromArgs, options, decoder,
                                        errorDecoder, decode404, closeAfterDecode);
}

6、SynchronousMethodHandler

6.1、实现了invoke方法,即是代理请求

@Override
public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
        try {
            return executeAndDecode(template);
        } catch (RetryableException e) {
            retryer.continueOrPropagate(e);
            if (logLevel != Logger.Level.NONE) {
                logger.logRetry(metadata.configKey(), logLevel);
            }
            continue;
        }
    }
}

这里虽然表明了在接口上通过@FeignClient 注解实现调用,但是如何通过微服务注册中心的注册名称来获取对应服务的IP、端口的列表还没有展示出来。

6.2、executeAndDecode

实现通过客户端调用,即是之前的LoadBalancerFeignClient

//...
try {
    response = client.execute(request, options);
    // ensure the request is set. TODO: remove in Feign 10
    response.toBuilder().request(request).build();
} catch (IOException e) {
    if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
    }
    throw errorExecuting(request, e);
}
//...

7、LoadBalancerFeignClient

7.1、查看execute方法,通过调用executeWithLoadBalancer执行

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    try {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);

        IClientConfig requestConfig = getClientConfig(options, clientName);
        return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                                                            requestConfig).toResponse();
    }
    catch (ClientException e) {
        IOException io = findIOException(e);
        if (io != null) {
            throw io;
        }
        throw new RuntimeException(e);
    }
}

7.2、查看executeWithLoadBalancer,这里使用了Reactor框架

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }

}

7.3、查看submit方法,后面的只是一个内部类对象参数,暂不关注

8、LoadBalancerCommand

8.1、submit方法

//...
server == null ? selectServer() : Observable.just(server)
//...

这个server就是具体的ip和端口信息实例

8.2、查看selectServer

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

这里获取的Server信息是从已获取到的实例server列表中通过规则,默认是RoundRobin规则选择一个Server进行请求。

具体的获取到Server列表信息是ILoadBanlancer的实现类ZoneAwareLoadBalancerFeignClient加载到Spring容器中实现的。

9、ZoneAwareLoadBalancer

9.1、配置类

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

查看构造函数

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

// 到父类 DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
    }
    restOfInit(clientConfig);
}

查看restOfInit方法

void restOfInit(IClientConfig clientConfig) {
    //...
    updateListOfServers();
   //...
}

查看updateListOfServers方法,

是通过ServerList实现对象获取Server

@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
    servers = serverListImpl.getUpdatedListOfServers();
    LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);

    if (filter != null) {
    servers = filter.getFilteredListOfServers(servers);
    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);
    }
    }
    updateAllServerList(servers);
}

查看之前依赖的serverList bean,实现有KubernetesServerList (k8s ribbon),ConsulServerList(consul ribbon)

KubernetesServerList

k8s中是通过endpoint实现获取

public List<Server> getUpdatedListOfServers() {
Endpoints endpoints = this.namespace != null
				? this.client.endpoints().inNamespace(this.namespace)
						.withName(this.serviceId).get()
				: this.client.endpoints().withName(this.serviceId).get();
    //...
}

ConsulServerList

通过ConsulClient获取

@Override
public List<ConsulServer> getUpdatedListOfServers() {
    return getServers();
}
private List<ConsulServer> getServers() {
    if (this.client == null) {
        return Collections.emptyList();
    }
    String tag = getTag(); // null is ok
    Response<List<HealthService>> response = this.client.getHealthServices(
        this.serviceId, tag, this.properties.isQueryPassing(),
        createQueryParamsForClientRequest(), this.properties.getAclToken());
    if (response.getValue() == null || response.getValue().isEmpty()) {
        return Collections.emptyList();
    }
    return transformResponse(response.getValue());
}

这里根据微服务注册名称获取具体的IP、端口信息。

因此@FeignClient注解生效的原因是,对注解的接口生成一个代理类,对目的方法进行代理,通过微服务注册名称获取具体的ip和端口信息列表,通过负载均衡请求。

相关标签: spring spring