SpringCloud源码学习笔记之Eureka客户端——初始化
1、Eureka客户端初始化流程
- 启动类上的@EnableDiscoveryClient注解,通过@Import引入了EnableDiscoveryClientImportSelector配置类,然后该配置类又通过selectImports()方法,注入AutoServiceRegistrationConfiguration类。
- 基于SpringBoot自动装配机制,从spring-cloud-netflix-eureka-client.jar包下的spring.factories文件配置文件中装载初始化需要的配置类。
- 首先,分析加载的配置类EurekaDiscoveryClientConfiguration,该配置类主要在符合@ConditionalOnXXX条件的情况下,加载了一些实例对象包括了EurekaDiscoveryClient(用于和Eureka服务端进行交互,内部通过EurekaClient实现)、EurekaHealthCheckHandler(实例运行状态处理器)、EurekaClientConfigurationRefresher(ApplicationListener实现类,用于监听RefreshScopeRefreshedEvent事件),最后还有一个Marker,该类实例对象原来用于控制加载EurekaClientAutoConfiguration 配置类,现在已经不用了。
- 然后,分析加载的EurekaClientAutoConfiguration配置类,该类主要注入Eureka客户端配置对象、服务实例配置对象、EurekaClient对象(实际上是CloudEurekaClient对象)、EurekaRegistration对象、EurekaAutoServiceRegistration对象、EurekaHealthIndicator对象等。
- 在EurekaClientAutoConfiguration配置类加载的EurekaAutoServiceRegistration对象,是一个实现了SmartLifecycle接口的对象,所以会在初始化之后,执行start()方法。
- 在上述start()方法中,首先调用EurekaServiceRegistry类的register()方法
- 而EurekaServiceRegistry类的register()方法中,又调用了maybeInitializeClient()方法,然后再调用setInstanceStatus()方法,最后调用registerHealthCheck()方法。
- 在上述start()方法中,执行完register()方法后,会调用publishEvent()方法,发送实例注册事件(InstanceRegisteredEvent)。
- 除了EurekaAutoServiceRegistration这条线之外,还有就是实例化EurekaDiscoveryClient对象,在初始化该对象是需要注入EurekaClient对象,而EurekaClient对象就是在配置文件EurekaClientAutoConfiguration配置类中通过@Bean注入的一个对象,实际上使用的是CloudEurekaClient对象。
- 而CloudEurekaClient又是 DiscoveryClient类的实现类,所以在构建对象时,就会调用DiscoveryClient类的构造函数,所在在前面《DiscoveryClient接口的层级结构》中提到的那个构造函数就会被调用,其实Eureka实例的初始化工作,很多就在这个构造函数中完成了。
- 在DiscoveryClient构造函数中,首先初始化一些变量,包括调度执行器ScheduledExecutorService变量scheduler,ThreadPoolExecutor类型的心跳执行器(heartbeatExecutor)、缓存刷新执行器(heartbeatExecutor),EurekaTransport对象(基于Jersey开发的用来和Eureka服务交互的Http方法)等。
- 初始化对象之后,根据条件(默认true),调用fetchRegistry()拉去Eureka服务端信息
- 在fetchRegistry()方法中,有根据条件判断,进行全量或增量拉去信息,分别调用getAndStoreFullRegistry()或getAndUpdateDelta()方法。
- 拉去服务端注册信息后,根据条件(默认为false),进行服务注册,调用register()方法实现。
- 之后,再调用initScheduledTasks()方法,完成一些定时任务的执行。
- 在initScheduledTasks()方法中,首先,通过 scheduler.schedule()方法,执行TimedSupervisorTask任务,实际上执行的是CacheRefreshThread类中的run()方法,在run()方法中,又调用refreshRegistry()方法,进行Eureka服务端信息的定时刷新,其中又调用了fetchRegistry()方法,实现了数据的拉去。
- 定期拉去服务注册信息后,就启动了定时刷新心跳动作,实际逻辑在HeartbeatThread类中的run()方法中完成,实际由renew()方法完成。
- 然后创建InstanceInfoReplicator对象,并创建状态监听器ApplicationInfoManager.StatusChangeListener对象
- 然后根据配置onDemandUpdateStatusChange(默认true),把监听器对象配置到InstanceInfoReplicator对象中。
- 最后,instanceInfoReplicator.start()方法,开启InstanceInfoReplicator线程的执行
- 因为InstanceInfoReplicator是Runnable接口的实现类,所以在通过start()方法启动后,就会执行run()方法,
- 在run()方法中,首先会调用discoveryClient.refreshInstanceInfo()刷新本地实例信息,然后再调用discoveryClient.register()进行注册。在run()方法中的finally代码块中,又调用了scheduler.schedule()方法,实现了定期注册。
2、@EnableDiscoveryClient注解
该注解用于进行服务发现,其实在新版中该注解已经可以省略。其中,autoRegister属性用于配置是否启用自动注册的功能。作为服务发现实例,主要是注入DiscoveryClient实例,用于服务发现。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
boolean autoRegister() default true;
}
2.1、EnableDiscoveryClientImportSelector
在@EnableDiscoveryClient注解上,通过@Import注解引入了EnableDiscoveryClientImportSelector类,通过selectImports()方法,返回了需要注入到容器中的AutoServiceRegistrationConfiguration对象。实现如下:
@Override
public String[] selectImports(AnnotationMetadata metadata) {
String[] imports = super.selectImports(metadata);
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
boolean autoRegister = attributes.getBoolean("autoRegister");
if (autoRegister) {
List<String> importsList = new ArrayList<>(Arrays.asList(imports));
importsList.add(
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
imports = importsList.toArray(new String[0]);
}
else {
Environment env = getEnvironment();
if (ConfigurableEnvironment.class.isInstance(env)) {
ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env;
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
map.put("spring.cloud.service-registry.auto-registration.enabled", false);
MapPropertySource propertySource = new MapPropertySource(
"springCloudDiscoveryClient", map);
configEnv.getPropertySources().addLast(propertySource);
}
}
return imports;
}
2.2、AutoServiceRegistrationConfiguration
通过上述selectImports()注入了AutoServiceRegistrationConfiguration对象。该类中的代码,已经迁移到了AutoServiceRegistrationAutoConfiguration类,而AutoServiceRegistrationAutoConfiguration类将通过SpringBoot加载META-INF/spring.factories文件中配置类的方法进行加载
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public class AutoServiceRegistrationConfiguration {
}
AutoServiceRegistrationAutoConfiguration 配置类,主要是用来判断初始化是否成功,判断依据是:Spring容器中是否已经注入了AutoServiceRegistration对象,该对象是在EurekaClientAutoConfiguration配置类中进行加载的。
@Configuration(proxyBeanMethods = false)
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration {
@Autowired(required = false)
private AutoServiceRegistration autoServiceRegistration;
@Autowired
private AutoServiceRegistrationProperties properties;
@PostConstruct
protected void init() {
if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
throw new IllegalStateException("Auto Service Registration has "
+ "been requested, but there is no AutoServiceRegistration bean");
}
}
}
3、基于SpringBoot的自动装配机制
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration
在spring-cloud-netflix-eureka-client.jar包下的spring.factories文件中,定义了好多自动配置类,如上所示,包括reactive相关的、负载均衡相关的、服务发现配置和配置中心相关的、还有用于服务发现和服务注册的配置类EurekaClientAutoConfiguration和EurekaDiscoveryClientConfiguration,我们后续就是通过这个两个类来分析Eureka客户端的初始化过程。
4、自动配置类EurekaDiscoveryClientConfiguration
该类在spring.factories文件中配置,会通过SpringBoot自动装配机制进行加载。该配置类主要在符合@ConditionalOnXXX条件的情况下,加载了一些实例对象包括了EurekaDiscoveryClient(用于和Eureka服务端进行交互,内部通过EurekaClient实现)、EurekaHealthCheckHandler(实例运行状态处理器)、EurekaClientConfigurationRefresher(ApplicationListener实现类,用于监听RefreshScopeRefreshedEvent事件),最后还有一个Marker,该类实例对象原来用于控制是否加载EurekaClientAutoConfiguration 配置类,现在已经不用了。
5、自动配置类EurekaClientAutoConfiguration
该类主要注入Eureka客户端配置对象、服务实例配置对象、EurekaClient对象(实际上是CloudEurekaClient对象)、EurekaRegistration对象、EurekaAutoServiceRegistration对象、EurekaHealthIndicator对象等。
5.1、EurekaAutoServiceRegistration对象
在EurekaClientAutoConfiguration配置类中通过@Bean注解加载EurekaAutoServiceRegistration对象,该对象是一个实现了SmartLifecycle接口的类实例,所以当Spring容器加载所有bean并完成初始化之后,会接着回调该对象的start()方法。
关于SmartLifecycle接口用法,请参考 《Spring SmartLifecycle 在容器所有bean加载和初始化完毕执行》。
//EurekaClientAutoConfiguration配置类
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(
value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
ApplicationContext context, EurekaServiceRegistry registry,
EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
EurekaAutoServiceRegistration对象的start()方法:
//EurekaAutoServiceRegistration类
public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
SmartLifecycle, Ordered, SmartApplicationListener {
//省略其他……
@Override
public void start() {
// 判断协议类型,配置使用的端口号
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// 判断当前实例的运行状态,(为什么只判断NonSecurePort端口,不判断SecurePort?)
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//注册EurekaRegistration对象,
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}
}
在start()方法中,主要是调用是调用serviceRegistry.register()方法完成注册,然后发布一个InstanceRegisteredEvent事件,最后修改本地对象的running状态。
5.2、EurekaServiceRegistry对象
5.2.1、EurekaServiceRegistry的register()方法
在EurekaAutoServiceRegistration对象的start()方法中,主要实现了EurekaRegistration对象(服务实例)注册。
@Override
public void register(EurekaRegistration reg) {
//初始化客户端信息,主要用来初始化cloudEurekaClient==null情况
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
//注册实例健康检查器
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
5.2.2、EurekaServiceRegistry的maybeInitializeClient()方法
主要在CloudEurekaClient 对象为空的情况下,进行初始化,实际通过调用EurekaRegistration对象的getEurekaClient()方法实现。
private void maybeInitializeClient(EurekaRegistration reg) {
// force initialization of possibly scoped proxies
reg.getApplicationInfoManager().getInfo();
reg.getEurekaClient().getApplications();
}
EurekaRegistration对象的getEurekaClient()方法,主要完成了CloudEurekaClient 对象的初始化(如果不存在的情况下)。
public CloudEurekaClient getEurekaClient() {
if (this.cloudEurekaClient.get() == null) {
try {
this.cloudEurekaClient.compareAndSet(null,
getTargetObject(eurekaClient, CloudEurekaClient.class));
}
catch (Exception e) {
log.error("error getting CloudEurekaClient", e);
}
}
return this.cloudEurekaClient.get();
}
5.2.3、ApplicationInfoManager的setInstanceStatus()方法
该方法主要用来设置实例的状态,该状态用来表示该实例是否可以接收信息。同时会通知所有的监听器一个状态改变的事件消息StatusChangeEvent。
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
6、CloudEurekaClient、DiscoveryClient实例化
除了EurekaAutoServiceRegistration这条线之外,还有就是实例化EurekaDiscoveryClient对象,在初始化该对象是需要注入EurekaClient对象,而EurekaClient对象就是在配置文件EurekaClientAutoConfiguration配置类中通过@Bean注入的一个对象,实际上使用的是CloudEurekaClient对象。
而CloudEurekaClient又是 DiscoveryClient类的实现类,所以在构建对象时,就会调用DiscoveryClient类的构造函数,所在在前面《DiscoveryClient接口的层级结构》中提到的那个构造函数就会被调用,其实Eureka实例的初始化工作,很多就在这个构造函数中完成了,我们这里主要分析initScheduledTasks()方法中的逻辑。
6.1、initScheduledTasks()方法——服务获取
initScheduledTasks()方法是用来初始化定时任务,其中包括了服务获取和服务注册(心跳、续约)两部分内容,我们这里先学习其中服务获取的相关代码,如下所示:
private void initScheduledTasks() {
//服务获取,会通过REST请求从Eureka服务中获取其他Eureka客户端(服务实例)的信息,形成服务实例清单,缓存到本地。
if (clientConfig.shouldFetchRegistry()) {
// 刷新间隔,默认30s
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
//失败重试次数,默认10次
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//创建线程,用于服务发现和定时更新服务实例信息
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
//执行任务
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//……
}
根据上述代码,我们可以知道,服务发现的流程如下:
- 首先,根据配置文件中的eureka.client.fetchRegistry配置(默认为true)判断是否进行服务发现。
- 然后,获取registryFetchIntervalSeconds(默认30s)和cacheRefreshExecutorExponentialBackOffBound(默认10)两个配置参数,分别表示从eureka服务器注册表中获取注册信息的时间间隔(s)和 执行程序指数回退刷新的相关属性,是重试延迟的最大倍数值。
- 创建了一个TimedSupervisorTask任务线程,该线程主要实现了对子任务的监控和管理,比如设置线程执行超时时间、设置延期执行时间等
- 在创建TimedSupervisorTask任务线程的时候,传入的scheduler参数,是前面已经创建的,用来执行线程任务的线程池。
- TimedSupervisorTask线程任务主要实现 的任务其实是创建的CacheRefreshThread对象,该对象是一个Runnable接口的实现类对象,所以其中的run()方法就是真正需要执行的任务。
- 在上面的run()方法中,又是通过调用refreshRegistry()方法实现具体逻辑的。
- 最后,调用scheduler.schedule()方法实现了任务的调用。
关于服务发现的具体实现逻辑请参考《Eureka客户端——服务发现》。
6.2、initScheduledTasks()方法——服务续约、注册
initScheduledTasks()方法实现服务续约、注册相关逻辑如下所示:
private void initScheduledTasks() {
//……
//服务注册和续约
if (clientConfig.shouldRegisterWithEureka()) {
//续约时间间隔
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//失败重试次数
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// 创建心跳服务线程,同时进行服务续约
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
//执行定时任务
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// 创建注册线程,主要用于服务注册和节点间的数据同步
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//创建状态监听器,维护响应状态
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
//注册监听器到applicationInfoManager实例对象(默认注册,即判断条件默认为true)
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启用服务注册线程
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
根据上述代码,我们可以知道,服务发现的流程如下:
- 根据配置文件中的eureka.client.registerWithEureka配置(默认为true)判断是否进行把当前服务注册到Eureka服务中
- 然后,获取renewalIntervalInSecs和heartbeatExecutorExponentialBackOffBound两个配置参数的值,分别表示续约时间间隔和执行程序指数回退刷新的相关属性,是重试延迟的最大倍数值。
- 然后,和服务发现类似,创建了一个TimedSupervisorTask任务线程,不过该线程中执行的任务是HeartbeatThread类中定义的(主要实现服务续约)
- HeartbeatThread对象是一个Runnable接口的实现类对象,所以其中的run()方法就是真正需要执行的任务,实际执行了renew()方法。
- 然后,调用scheduler.schedule()方法实现了任务的调用,实现服务续约的定时执行。
- 创建InstanceInfoReplicator对象,用于实现服务注册的实现方法,该对象也实现了Runnable接口,主要逻辑也是在run()方法中实现,不过该方法提供了一个start()方法,供外部方法进行调用并启动该线程任务。
- 创建StatusChangeListener监听器,用于监听实例的状态变化,主要通过调用instanceInfoReplicator对象的onDemandUpdate()方法实现状态更新。
- 当配置文件中配置了onDemandUpdateStatusChange=true(默认即为true),则会把上面创建的StatusChangeListener监听器,通过applicationInfoManager对象的registerStatusChangeListener()方法进行注册,即启用监听器。
- 最后,通过调用instanceInfoReplicator的start()方法,启动服务注册的线程。
6.2.1、服务续约
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
在方法中,主要是通过eurekaTransport对象中维护的registrationClient对象,然后调用该对象的sendHeartBeat()方法实现。具体实现请参考《Eureka客户端——服务续约》。
6.2.2、服务注册
实现服务注册的方法主要是通过InstanceInfoReplicator对象实现,该对象实现了Runnable接口,其中实现的run()方法,即实现了服务注册功能。
public void run() {
try {
//刷新实例信息
discoveryClient.refreshInstanceInfo();
//是否有状态更新过了,有的话获取更新的时间
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
//有脏数据,要重新注册
if (dirtyTimestamp != null) {
discoveryClient.register();
//设置更新标记为不更新
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
然后,关于服务注册的详细内容,可以参考《Eureka客户端——服务注册》。
本文地址:https://blog.csdn.net/hou_ge/article/details/111612888