EUREKA服务注册源码品读
程序员文章站
2022-03-25 10:56:35
由于不同版本的springcloub源码可能会有一下差异,以下源代码都取自:springcloub的Greenwich.SR1版本服务注册流程分析首先先看`@EnableDiscoveryClient`这个注解,发现这个注解中并没有实现的代码,但是在这个注解中引用了一个`EnableDiscoveryClientImportSelector.class`这个类,那么我们就去这个类中看看都有些什么东西。/** Copyright 2012-2019 the original auth....
由于不同版本的springcloub源码可能会有一下差异,以下源代码都取自:springcloub的Greenwich.SR1版本
如果对于eureka基础知识还不是很了解可以看这篇文章:服务治理(EUREKA)
服务注册流程分析
首先先看`@EnableDiscoveryClient`这个注解,发现这个注解中并没有实现的代码,但是在这个注解中引用了一个`EnableDiscoveryClientImportSelector.class`这个类,那么我们就去这个类中看看都有些什么东西。
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.discovery;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/**
* If true, the ServiceRegistry will automatically register the local server.
* @return - {@code true} if you want to automatically register.
*/
boolean autoRegister() default true;
}
我们进入`EnableDiscoveryClientImportSelector.class`这个类,在这个类的selectImports方法中会注入一个新的配置类org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.discovery;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import org.springframework.cloud.commons.util.SpringFactoryImportSelector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.type.AnnotationMetadata;
/**
* @author Spencer Gibb
*/
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector<EnableDiscoveryClient> {
@Override
public String[] selectImports(AnnotationMetadata metadata) {
String[] imports = super.selectImports(metadata);
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
boolean autoRegister = attributes.getBoolean("autoRegister");
if (autoRegister) {
List<String> importsList = new ArrayList<>(Arrays.asList(imports));
importsList.add(
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
imports = importsList.toArray(new String[0]);
}
else {
Environment env = getEnvironment();
if (ConfigurableEnvironment.class.isInstance(env)) {
ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env;
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
map.put("spring.cloud.service-registry.auto-registration.enabled", false);
MapPropertySource propertySource = new MapPropertySource(
"springCloudDiscoveryClient", map);
configEnv.getPropertySources().addLast(propertySource);
}
}
return imports;
}
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.discovery.enabled",
Boolean.class, Boolean.TRUE);
}
@Override
protected boolean hasDefaultFactory() {
return true;
}
}
那我们就来看一下这个config中都有哪些内容,在这个类中会自动装配AutoServiceRegistrationProperties.class这个类.
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.client.serviceregistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author Spencer Gibb
*/
@Configuration
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationConfiguration {
}
而在这个类中最终会将EurekaAutoServiceRegistration类注入到ApplicationContext中,这样在springboot启动的时候就会调用EurekaAutoServiceRegistration中的satrt方法。
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}
而在这个方法中会往上线文中注入一个InstanceRegisteredEvent事件,最会就会调用到DiscoveryClient中的register方法,而这个可以说是真正开始进行服务注册
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
这里要提一下,Eureka的服务注册使用的装饰者和代理模式,所有的实现类都实现了EurekaHttpClientDecorator类,大家可以看一下这个类中的register方法,在这个方法中会执行execute方法,而这个方法在EurekaHttpClientDecorator中是一个抽象类,在每一个实现类中都有自己的一套实现方案。
可能大家被绕晕了,这里我用一个图来讲一下,DiscoveryClient是要调用register方法的,register方法需要调用里面的execute方法,在EurekaHttpClientDecorator中execute方法是一个抽象方法,所以需要去实现类中调用execute方法,调用完后在返回register方法中,依次调用。
@Override
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
return execute(new RequestExecutor<Void>() {
@Override
public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
return delegate.register(info);
}
@Override
public RequestType getRequestType() {
return RequestType.Register;
}
});
}
EurekaHttpClientDecorator实现类总共分为4层,第一层为SessionedEurekaHttpClient,这层主要是获取需要带有session的eureka客户端信息,也是防止一个eureka客户端一直连接一台服务器,为高可用做铺垫
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
return requestExecutor.execute(eurekaHttpClient);
}
第二层为RetryableEurekaHttpClient,这层主要是用来在服务端的列表中获取一个可用的服务端信息
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
第三层为RedirectingEurekaHttpClient层,这层主要是要来寻找非302重定向的 Eureka-Server 的 EurekaHttpClient 。
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClient currentEurekaClient = delegateRef.get();
if (currentEurekaClient == null) {
AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
try {
EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
return response;
} catch (Exception e) {
logger.error("Request execution error. endpoint={}", serviceEndpoint, e);
TransportUtils.shutdown(currentEurekaClientRef.get());
throw e;
}
} else {
try {
return requestExecutor.execute(currentEurekaClient);
} catch (Exception e) {
logger.error("Request execution error. endpoint={}", serviceEndpoint, e);
delegateRef.compareAndSet(currentEurekaClient, null);
currentEurekaClient.shutdown();
throw e;
}
}
}
private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor,
AtomicReference<EurekaHttpClient> currentHttpClientRef) {
URI targetUrl = null;
for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get());
if (httpResponse.getStatusCode() != 302) {
if (followRedirectCount == 0) {
logger.debug("Pinning to endpoint {}", targetUrl);
} else {
logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount);
}
return httpResponse;
}
targetUrl = getRedirectBaseUri(httpResponse.getLocation());
if (targetUrl == null) {
throw new TransportException("Invalid redirect URL " + httpResponse.getLocation());
}
currentHttpClientRef.getAndSet(null).shutdown();
currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString())));
}
String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl();
logger.warn(message);
throw new TransportException(message);
}
第四层为MetricsCollectingEurekaHttpClient层,这层主要是监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
Stopwatch stopwatch = requestMetrics.latencyTimer.start();
try {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment();
exceptionsMetric.count(e);
throw e;
} finally {
stopwatch.stop();
}
}
最后调用AbstractJerseyEurekaHttpClient,去真正的向eureka服务端发送请求的方法了。
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
本文地址:https://blog.csdn.net/qq_38446413/article/details/112237131
推荐阅读
-
springcloud干货之服务注册与发现(Eureka)
-
SpringCloud之服务注册与发现Spring Cloud Eureka实例代码
-
Eureka获取服务列表源码解析
-
Eureka(服务注册与发现)简单入门
-
SpringCloud-微服务的注册与发现Eureka
-
SpringCloud微服务实战:一、Eureka注册中心服务端
-
SpringCloud(二):服务的注册与发现(Eureka)
-
SpringCloud:1章 Eureka服务注册与发现
-
Eureka源码探索(一)-客户端服务端的启动和负载均衡
-
SpringCloud-服务注册与实现-Eureka创建服务注册中心(附源码下载)