欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务

程序员文章站 2024-03-19 08:50:40
...

大致流程图

Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务

服务端接受服务注册

ApplicationResourceaddInstance用来接受客户端发来的POST请求:
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务
主要的是最后的注册到注册表,第二个参数是表示是否要进行集群同步复制,因为如果是其他结点同步过来的信息,就不需要再同步给其他结点了,就重复了,如果是客户端发来的信息,那就要给其他节点同步了。
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务

PeerAwareInstanceRegistryImpl的register

先进行注册,然后同步到其他结点。

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);//同步给其他结点
    }

AbstractInstanceRegistry的register

首先看注册表有没有,没有的话就创建一个,然后获取是否有相同的实例在了,有的话就进行数据的更新,然后创建一个新的续约,更新事件,最后让读写缓存失效。

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//注册表获取
            REGISTER.increment(isReplication);
            if (gMap == null) {//第一次为空
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//不存在就放入gNewMap
                if (gMap == null) {//没映射
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//获取实例id对应的租约信息
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {//存在的话
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();//取最新的
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {//更新续约数量,注册一个新的实例
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);//创建租约
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);//放进实例名和租约的映射集合
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));//加入到租约注册队列中
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {//覆盖信息不为unknow的要添加到overriddenInstanceStatusMap
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);//设置覆盖信息
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();//记录启动时间
            }
            registrant.setActionType(ActionType.ADDED);//标记为添加
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();//设置最后更新时间
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());//失效缓存
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

invalidateCache失效缓存

缓存是存在在ResponseCache中的。
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务
失效很多中缓存,比如增量,全量,服务等。
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务
这些信息都存在读写锁里readWriteCacheMap,这个是一个谷歌缓存框架提供的类:
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务

PeerAwareInstanceRegistryImpl的replicateToPeers

会同步到除去自己的其他结点中,最后调用replicateInstanceActionsToPeers
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务
放在一个批处理的处理器当中发送,最后还是jersey去完成发送:
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。