欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【SpringCloud微服务】第3章 服务治理SpringCloudEureka(五)——Eureka源码分析

程序员文章站 2022-06-12 19:13:45
...

2.8 Eureka 源码分析

  首先,对于服务注册中心、服务提供者、服务消费者这三个主要元素来说,后两者(也就是Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处理请求的接受者。所以,我们可以从Eureka的客户端作为入口看看它是如何完成这些主动通信行为的。

  我们在将一个普通的Spring Boot应用注册到Eureka Server或是从Eureka Server中获取服务列表时,主要就做了两件事:

①在应用主类中配置了@EnableDiscoveryClient注解
②在application.properties中用eureka.client.serviceUrl.defaultZone参数指定了服务中心的位置。

  顺着上面的线索,我们来看看@EnableDiscoveryClient的源码如下:

/**
 * Annotation to enable a DiscoveryClient implementation.
 * 用于开启DiscoveryClient的实例
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

	/**
	 * If true, the ServiceRegistry will automatically register the local server.
	 */
	boolean autoRegister() default true;
}

  从该注解的注释,我们知道,它主要用来开启DiscoveryClient的实例。通过搜索DiscoveryClient,我们发现有一个类和一个接口。通过梳理可以得到如下图所示的关系:

  【SpringCloud微服务】第3章 服务治理SpringCloudEureka(五)——Eureka源码分析

  ①左边的org.springframework.cloud.client.discovery.DiscoveryClient是Spring Cloud的接口,它定义了用来发现服务的常用抽象方法,通过该接口可以有效地屏蔽服务治理的实现细节,所以使用Spring Cloud构建的微服务可以方便地切换不同服务治理框架,而不改动程序代码,只需要另外添加一些针对服务治理框架的配置即可;

  ②左边的org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是对该接口的实现,从命名来判断,它实现的是对Eureka发现服务的封装,同时底层又依赖于EurekaClient;

  ③com.netflix.discovery.EurekaClient接口,又继承了com.netflix.discovery.shared.LookupService接口,它们都是Netflix开源包中的内容,主要定义了针对Eureka的发现服务的抽象方法,而真正实现发现服务的则是Netflix包中的com.netflix.discovery.DiscoveryClient类。

  接下来,我们就来详细看看DiscoveryClient类吧。先解读一下该类头部的注解,源码如下:

/**
 * The class that is instrumental for interactions with <tt>Eureka Server</tt>.(这个类与Eureka Server相互协作)
 * 
 * <p>
 * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the
 * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with
 * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from
 * <tt>Eureka Server</tt> during shutdown
 * <p>
 * d) <em>Querying</em> the list of services/instances registered with
 * <tt>Eureka Server</tt>
 * <p>
 * (Eureka Client 负责一下任务:
 *  a.向Eureka Server注册服务实例;
 *  b.向Eureka Server服务续约;
 *  c.当服务关闭期间,向Eureka Server取消租约;
 *  d.查询Eureka Server 中的服务实例列表。)
 * <p>
 * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>
 * (Eureka Client还需要配置一个Eureka Server的URL列表)
 * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips
 * which do not change. All of the functions defined above fail-over to other
 * {@link java.net.URL}s specified in the list in the case of failure.
 * </p>
 *
 * @author Karthik Ranganathan, Greg Kim
 * @author Spencer Gibb
 *
 */
@Singleton
public class DiscoveryClient implements EurekaClient {
     //细节省略。。。。
}

  在具体研究Eureka Client负责完成的任务之前,我们先看看在哪里对Eureka Server的URL列表进行配置。根据我们配置的属性名eureka.client.serviceUrl.defaultZone,通过serviceUrl可以找到该属性相关的加载属性,对应DiscoveryClient类中的源码部分为:

    /**
     * @deprecated see replacement in {@link com.netflix.discovery.endpoint.EndpointUtils}
     *
     * Get the list of all eureka service urls from properties file for the eureka client to talk to.
     *
     * @param instanceZone The zone in which the client resides
     * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
     * @return The list of all eureka service urls for the eureka client to talk to
     */
    @Deprecated
    @Override
    public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) {
        return EndpointUtils.getServiceUrlsFromConfig(clientConfig, instanceZone, preferSameZone);
    }

  我们发现,该方法已经废弃了,被标注为@Deprecated,并@link到了替代类com.netflix.discovery.endpoint.EndpointUtils,所以我们可以在EndpointUtils类中找到下面这个函数:

    /**
     * Get the list of all eureka service urls from properties file for the eureka client to talk to.
     *
     * @param clientConfig the clientConfig to use
     * @param instanceZone The zone in which the client resides
     * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
     * @return The list of all eureka service urls for the eureka client to talk to
     */
    public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
        List<String> orderedUrls = new ArrayList<String>();
        String region = getRegion(clientConfig);
        String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
        if (availZones == null || availZones.length == 0) {
            availZones = new String[1];
            availZones[0] = DEFAULT_ZONE;
        }
        logger.debug("The availability zone for the given region {} are {}", region, availZones);
        int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);

        List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
        if (serviceUrls != null) {
            orderedUrls.addAll(serviceUrls);
        }
        int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
        while (currentOffset != myZoneOffset) {
            serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
            if (serviceUrls != null) {
                orderedUrls.addAll(serviceUrls);
            }
            if (currentOffset == (availZones.length - 1)) {
                currentOffset = 0;
            } else {
                currentOffset++;
            }
        }

        if (orderedUrls.size() < 1) {
            throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
        }
        return orderedUrls;
    }

Region、Zone

  在上面的函数中,可以方法,客户端依次加载了两个内容,第一个是Region,第二个是Zone,从其加载逻辑上我们可以判断它们之间的关系:

     ①通过getRegion函数,我们可以看到它配置中读取了一个Region的返回值,所以一个微服务应用只可以属于一个Region,如果不特殊配置,默认为default。若我们要自己设置,可以通过eureka.client.region属性来定义。

    /**
     * Get the region that this particular instance is in.
     *
     * @return - The region in which the particular instance belongs to.
     */
    public static String getRegion(EurekaClientConfig clientConfig) {
        String region = clientConfig.getRegion();
        if (region == null) {
            region = DEFAULT_REGION;
        }
        region = region.trim().toLowerCase();
        return region;
    }

     ②通过getAvailabilityZones函数,可以知道当我们没有特别为Region配置Zone的时候,将默认采用defaultZone,这也是我们之前配置参数eureka.client.serviceUrl.defaultZone的由来。若要为应用指定Zone,可以通过eureka.client.vailability-zones属性来进行设置。从该函数的return内容,我们可以知道Zone能够设置多个,并且通过逗号分隔来配置。由此,我们可以判断Region与Zone是一对多的关系。

	public String[] getAvailabilityZones(String region) {
		String value = this.availabilityZones.get(region);
		if (value == null) {
			value = DEFAULT_ZONE;
		}
		return value.split(",");
	}

     ③在获取了Region和Zone的消息之后,才开始真正加载Eureka Server的具体地址。它根据传入的参数按一定算法确定加载位于哪一个Zone配置的serviceUrls。

int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
String zone = availZones[myZoneOffset];
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);

  具体获取serviceUrls的实现,我们可以详细查看getEurekaServerServiceUrls函数的具体实现类EurekaClientConfigBean,用来加载配置文件中的内容,这里有许多非常有用的信息,我们先说一下此处我们关心的,关于defaultZone的信息。通过搜索defaultZone,我们可以很容易找到下面这个函数,它具体实现了如何解析该参数的过程,通过此内容,我们就可以知道,eureka.client.serviceUrl.defaultZone属性可以配置多个,并且需要通过逗号分隔。

	@Override
	public List<String> getEurekaServerServiceUrls(String myZone) {
		String serviceUrls = this.serviceUrl.get(myZone);
		if (serviceUrls == null || serviceUrls.isEmpty()) {
			serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
		}
		if (!StringUtils.isEmpty(serviceUrls)) {
            //StringUtils.commaDelimitedListToStringArray(str)逗号分隔字符串
			final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
			List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
			for (String eurekaServiceUrl : serviceUrlsSplit) {
				if (!endsWithSlash(eurekaServiceUrl)) {
					eurekaServiceUrl += "/";
				}
				eurekaServiceUrls.add(eurekaServiceUrl);
			}
			return eurekaServiceUrls;
		}

		return new ArrayList<>();
	}

  当我们在微服务应用中使用Ribbon来实现服务调用时,对于Zone的设置可以在负载均衡时实现区域亲和特征:Ribbon的默认策略会优先访问同客户端处于一个Zone中的实例。所以通过Zone属性的定义,配合实际部署的物理结构,我们就可以有效地设计出对区域性故障的容错集群。

服务注册

  在理解了多个服务注册中心信息的加载后,我们再回头看看DiscoveryClient类是如何实现“服务注册”行为的,通过查看它的构造类,可以找到它调用了下面这个函数:

private void initScheduledTasks() {
    // 。。。省略部分。。。
// InstanceInfo replicator
if (clientConfig.shouldRegisterWithEureka()) {
        instanceInfoReplicator = new InstanceInfoReplicator(
          this,
          instanceInfo,
          clientConfig.getInstanceInfoReplicationIntervalSeconds(),
          2); // burstSize
    // 。。。省略部分。。。
    } else {
          logger.info("Not registering with Eureka server per configuration");
    }
}

  从上面的函数中,可以看到一个与服务注册相关的判断语句if (clientConfig.shouldRegisterWithEureka()) 。在该分支内,创建了一个InstanceInfoReplicator类的实例,它会执行一个定时任务,而这个定时任务的具体工作可以查看该类的run()函数,具体如下:

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

  相信大家都发现了 discoveryClient.register();这一行,真正触发调用注册的地方就在这里。继续看register()的实现内容,如下:

    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

  通过属性命名,大家基本也能猜出来,注册操作也是通过REST请求的方式进行的。同时,我们能看到发起注册请求的时候,传入一个instanceInfo对象,该对象就是注册时,客户端给服务端的服务的元数据。

服务获取与服务续约

  顺着上面的思路,我们继续来看DiscoveryClient的initScheduledTasks函数,不难发现在其中还有两个定时任务,分别是“服务获取”和“服务续约”:

    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
     // 。。。省略。。。
    }

  从源码中我们可以发现,“服务获取”任务相对于“服务续约”和“服务注册”任务更为独立。“服务续约”与“服务注册”在同一个if逻辑中,这个不难理解,服务注册到Eureka Server后,自然需要一个心跳去续约,防止被踢除,所以它们肯定是成对出现的。从源码中,我们更清楚地看到了之前所提到的,对于服务续约相关的时间控制参数:

eureka.instance.lease-renewal-interval-in-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=90

  而“服务获取”的逻辑在独立的一个if判断中,其判断依据就是我们之前所提到的eureka.client.fetch-registry=true参数,它默认为true,大部分情况下我们不需要关心。为了定期更新客户端的服务清单,以保证客户端能够访问确实健康的服务实例,“服务获取”的请求不会只局限于服务启动,而是一个定时执行的任务,从源码中,我们可以看到任务运行中的registryFetchIntervalSeconds参数对应的就是之前所提到的eureka.client.registry-fetch-interval-seconds=30配置参数,它默认为30秒。

  继续向下深入,我们能分别发现实现“服务获取”和“服务续约”的具体方法,其中“服务续约”的实现较为简单,直接REST请求的方式进行续约:

    /**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

  而“服务获取”则复杂一些,会更具是否是第一次获取发起不同的REST请求和相应的处理。具体的实现逻辑跟之前类似,有兴趣的读者可以继续查看客户端的其他具体内容,以了解更多细节。

服务注册中心处理

  通过上面的源码分析,可以看到所有的交互都是通过REST请求来发起的。下面我们来看看服务注册中心对这些请求的处理。Eureka Server对于各类REST请求的定义都位于com.netflix.eureka.resources包下。

  以“服务注册”请求为例:

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        //....
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

  在对注册信息进行了一堆校验之后,会调用com.netflix.eureka.registry.PeerAwareInstanceRegistry.register(InstanceInfo, boolean)函数来进行服务注册:

    /**
     * Registers the information about the {@link InstanceInfo} and replicates
     * this information to all peer eureka nodes. If this is replication event
     * from other replica nodes then it is not replicated.
     *
     * @param info
     *            the {@link InstanceInfo} to be registered and replicated.
     * @param isReplication
     *            true if this is a replication event from other replica nodes,
     *            false otherwise.
     */
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

  注册实例的真正存储是一个Map,这个Map的key为服务的AppName, value为该AppName的实例集合Map, 实例集合的key为注册服务的实例id, value为 Lease, Lease的概念为租期, 租期到期的话则该服务实例会被过期剔除, 续期(心跳)可配置在以下参数: 

# 服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除
eureka.instance.leaseExpirationDurationInSeconds = 90s (默认90s) 
#服务刷新时间配置,每隔这个时间会主动心跳一次
eureka.instance.leaseRenewalIntervalInSeconds = 30s (默认30s)

   源码如下:

	/**
	 * 注册中心真正存储服务实例信息的是一个ConcurrentHashMap
	 */
	private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
	        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
	public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
	    try {
	        //key为appName, 也就是以spring.application.name的大写字符
	        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
	        //如果该AppName的实例集合不存在
	        if (gMap == null) {
	            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
	                         new ConcurrentHashMap<String, Lease<InstanceInfo>>();
	            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
	            if (gMap == null) {
	                gMap = gNewMap;
	            }
	        }
	        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
	        // ...
	        gMap.put(registrant.getId(), lease);
	    }
	    // ...
	}