Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务
程序员文章站
2024-03-19 08:50:40
...
Spring Cloud 2.2.2 源码之九Eureka服务端处理注册服务
大致流程图
服务端接受服务注册
ApplicationResource
的addInstance
用来接受客户端发来的POST
请求:
主要的是最后的注册到注册表,第二个参数是表示是否要进行集群同步复制,因为如果是其他结点同步过来的信息,就不需要再同步给其他结点了,就重复了,如果是客户端发来的信息,那就要给其他节点同步了。
PeerAwareInstanceRegistryImpl的register
先进行注册,然后同步到其他结点。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);//同步给其他结点
}
AbstractInstanceRegistry的register
首先看注册表有没有,没有的话就创建一个,然后获取是否有相同的实例在了,有的话就进行数据的更新,然后创建一个新的续约,更新事件,最后让读写缓存失效。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//注册表获取
REGISTER.increment(isReplication);
if (gMap == null) {//第一次为空
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//不存在就放入gNewMap
if (gMap == null) {//没映射
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//获取实例id对应的租约信息
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {//存在的话
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();//取最新的
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {//更新续约数量,注册一个新的实例
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);//创建租约
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);//放进实例名和租约的映射集合
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));//加入到租约注册队列中
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {//覆盖信息不为unknow的要添加到overriddenInstanceStatusMap
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);//设置覆盖信息
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();//记录启动时间
}
registrant.setActionType(ActionType.ADDED);//标记为添加
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();//设置最后更新时间
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());//失效缓存
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
invalidateCache失效缓存
缓存是存在在ResponseCache
中的。
失效很多中缓存,比如增量,全量,服务等。
这些信息都存在读写锁里readWriteCacheMap
,这个是一个谷歌缓存框架提供的类:
PeerAwareInstanceRegistryImpl的replicateToPeers
会同步到除去自己的其他结点中,最后调用replicateInstanceActionsToPeers
:
放在一个批处理的处理器当中发送,最后还是jersey
去完成发送:
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。