欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Apollo源码-长轮询客户端实现

程序员文章站 2022-03-03 09:16:53
前言在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码Apollo简介fork 源码地址 apollo源码参考apollo架构中心设计主要分为 Config Service、Admin Service、Portal、Client 四部分上文介绍到长轮询服务端实现,本文介绍客户端的长轮询实现长轮询感知配置发布-客户端实现从上文中我们知道,服务端暴露了一个将请求挂起60s的接口 /notifi...

前言

在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码

Apollo简介

fork 源码地址 apollo源码
参考apollo架构中心设计
主要分为 Config ServiceAdmin ServicePortalClient 四部分
上文介绍到长轮询服务端实现,本文介绍客户端的长轮询实现

长轮询感知配置发布-客户端实现

从上文中我们知道,服务端暴露了一个将请求挂起60s的接口 /notifications/v2,来供客户端感知服务端配置变化。上文我们只讲了服务端的源码实现,那本文来读一下客户端是如何从configservice读取配置的源码。

我们先定位到代码 RemoteConfigLongPollService#doLongPollingRefresh 感知配置变化,那何处调用了它,我们追踪到 RemoteConfigRepository 类的构造方法

/**
* Constructor.
*
* @param namespace the namespace
*/
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
 m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
//尝试同步配置
this.trySync();
//定时刷新配置
this.schedulePeriodicRefresh();
//长轮询刷新配置  
this.scheduleLongPollingRefresh();
}
  • 构造方法传递一个 namespace 参数,也就是说一个 RemoteConfigRepository 对应一个 namespace

  • schedulePeriodicRefresh 方法每5分钟调用一次 trySync 方法

    而trySync方法内部逻辑在 RemoteConfigLongPollService#doLongPollingRefresh 也用到,我们待会看它时再解析,先接着看长轮询刷新配置方法 this.scheduleLongPollingRefresh();

private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}

此时,将需要长轮询的 namespaceRemoteConfigRepository 传递给了 RemoteConfigLongPollService 实例:

public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
//注册remoteConfigRepository到内存
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
 //开始长轮询
 startLongPolling();
}
return added;
}
  • 注册 RemoteConfigRepository 到内存
  • 开始长轮询

继续跟踪 startLongPolling 方法

public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
//注册remoteConfigRepository到内存
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
//开始长轮询
startLongPolling();
}
return added;
}

private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
//读取appId
final String appId = m_configUtil.getAppId();
//读取cluster
final String cluster = m_configUtil.getCluster();
//读取dataCenter
final String dataCenter = m_configUtil.getDataCenter();
//默认2s
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
//多线程轮询
m_longPollingService.submit(new Runnable() {
 @Override
 public void run() {
   if (longPollingInitialDelayInMills > 0) {
     try {
       logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
       //睡眠2s,让线程消费更慢,可以让大多数任务可以在阻塞队列中去排队
       TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
     } catch (InterruptedException e) {
       //ignore
     }
   }
   //长轮询刷新
   doLongPollingRefresh(appId, cluster, dataCenter);
 }
});
} catch (Throwable ex) {
//回滚标识
m_longPollStarted.set(false);
//日志输出
ApolloConfigException exception =
   new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
  • 使用 AtomicBoolean 原子类作为长轮询开始标识,若整个过程失败,会回滚该标识

  • 从app.properties或系统参数中读取appId、cluster、dataCenter

  • 多线程执行轮询,每2s睡眠一次,降低cpu压力

    我们来看看多线程轮询的动作 doLongPollingRefresh 方法

private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
//new 一个随机数
final Random random = new Random();
//appName、instanceId、homepageUrl
ServiceDTO lastServiceDto = null;
//while 可以执行
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
//限流: 每秒2次许可,阻塞线程,5秒内仍获取不到许可,则返回false
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
 //wait at most 5 seconds
 try {
   //休眠5秒
   TimeUnit.SECONDS.sleep(5);
 } catch (InterruptedException e) {
 }
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
 if (lastServiceDto == null) {
   //获取服务端配置
   List<ServiceDTO> configServices = getConfigServices();
   //随机赋值一个
   lastServiceDto = configServices.get(random.nextInt(configServices.size()));
 }

 url =
     assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
         m_notifications);

 logger.debug("Long polling from {}", url);
 HttpRequest request = new HttpRequest(url);
 request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);

 transaction.addData("Url", url);

 final HttpResponse<List<ApolloConfigNotification>> response =
     m_httpUtil.doGet(request, m_responseType);

 logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
 if (response.getStatusCode() == 200 && response.getBody() != null) {
   updateNotifications(response.getBody());
   updateRemoteNotifications(response.getBody());
   transaction.addData("Result", response.getBody().toString());
   notify(lastServiceDto, response.getBody());
 }

 //try to load balance
 if (response.getStatusCode() == 304 && random.nextBoolean()) {
   lastServiceDto = null;
 }

 m_longPollFailSchedulePolicyInSecond.success();
 transaction.addData("StatusCode", response.getStatusCode());
 transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
 lastServiceDto = null;
 Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
 transaction.setStatus(ex);
 long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
 logger.warn(
     "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
     sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
 try {
   TimeUnit.SECONDS.sleep(sleepTimeInSecond);
 } catch (InterruptedException ie) {
   //ignore
 }
} finally {
 transaction.complete();
}
}
}

代码分三段去看
第一段:

//获取服务端实例信息
List<ServiceDTO> configServices = getConfigServices();
//随机赋值一个
lastServiceDto = configServices.get(random.nextInt(configServices.size()));

执行远端请求configservice的 ServiceController#getConfigService 接口,从Eureka中获取 apollo-configservice 实例信息,并随机一个赋值给ServiceDTO

第二段:

//返回长轮询接口NotificationControllerV2的url
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);

参数详解:

lastServiceDto.getHomepageUrl() : 获取 configservice 实例的绝对主页URL路径,为其他服务提供信息时使用的路径,默认为null。

m_notifications: 在之前注册 remoteConfigRepository 到内存时初始化了,键为:namespace,值为: -1

第三段

logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//设置超时时间90 * 1000
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);

transaction.addData("Url", url);

//发送请求
final HttpResponse<List<ApolloConfigNotification>> response =
   m_httpUtil.doGet(request, m_responseType);

logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
//请求成功
if (response.getStatusCode() == 200 && response.getBody() != null) {
 //修改m_notifications
 updateNotifications(response.getBody());
 //修改m_remoteNotificationMessages
 updateRemoteNotifications(response.getBody());
 transaction.addData("Result", response.getBody().toString());
 //通知配置变化
 notify(lastServiceDto, response.getBody());
}

//304:没有客户端关心的配置变化,再循环一次
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
 lastServiceDto = null;
}

m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
   "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
   sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
 TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
 //ignore
}
} finally {
transaction.complete();
}
}

关键性代码:

  • updateNotifications 方法,修改了之前 m_notifications 以namespaceName为key,messageId为value

  • updateRemoteNotifications 方法,修改了 m_remoteNotificationMessages 以namespaceName为key, ApolloNotificationMessages对象,该对象的message属性是以key:appId+cluster+namespace ,value:messageId

    notify 方法我们重点拿出来看:

private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
//遍历通知
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
//获取namespaceName对应的RemoteConfigRepository
List<RemoteConfigRepository> toBeNotified =
   Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
//获取消息
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
//克隆一份
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
//因为.properties在默认情况下被过滤掉,所以我们需要检查是否有它的侦听器
//如果m_longPollNamespaces中存在.properties的RemoteConfigRepository,则加入集合
toBeNotified.addAll(m_longPollNamespaces
   .get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
//遍历监听仓库集合
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
 try {
   //调用onLongPollNotified方法
   remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
 } catch (Throwable ex) {
   Tracer.logError(ex);
 }
}
}
}

参数详解:

lastServiceDto : Eureka中configservice实例DTO

notifications : 轮询configservice得到的更新配置集合

上述方法会遍历配置更新集合,获取到配置信息,根据namespace获取长轮询之前就注册到 RemoteConfigLongPollServiceRemoteConfigRepository ,并回调它的 onLongPollNotified 方法,我们接着来看此方法

public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
//保存当前serviceConfig实例
m_longPollServiceDto.set(longPollNotifiedServiceDto);
//保存当前消息
m_remoteMessages.set(remoteMessages);
//丢到线程池中执行
m_executorService.submit(new Runnable() {
@Override
public void run() {
 //标志位
 m_configNeedForceRefresh.set(true);
 //尝试同步
 trySync();
}
});
}

参数详解:

longPollNotifiedServiceDto : 轮询请求到的configservice实例

remoteMessages : 消息对象

看到熟悉的方法了 trySync , 我们之前在看 RemoteConfigRepository 构造方法时,就会去调用一次该方法,方法内部直接调用的是子类的 sync 方法,我们直接看该方法代码实现

@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
//从缓存中获取配置
ApolloConfig previous = m_configCache.get();
//远端请求configservice获取配置
ApolloConfig current = loadApolloConfig();

//reference equals means HTTP 304
//这里的判断是:如果服务端没有配置变化返回304,则current是从m_configCache.get()获取,如果两者不相等,证明有配置变化返回
if (previous != current) {
 logger.debug("Remote Config refreshed!");
 //将配置加入缓存
 m_configCache.set(current);
 //通知监听器,配置变更
 this.fireRepositoryChange(m_namespace, this.getConfig());
}

if (current != null) {
 Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
     current.getReleaseKey());
}

transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
  • 读取configservice变更配置信息,并加入到缓存

  • 通知监听器配置变更

    loadApolloConfig 方法这里不再跟读了,大致内容是请求 ConfigController#queryConfig 获取appId、cluster、namespace下的apollo配置

流程图总结如下 :
Apollo源码-长轮询客户端实现
留两个问题:通知监听器后,它们干了些什么,configservice是通过怎样的推算知道配置没有发生变化

本文地址:https://blog.csdn.net/m0_37268363/article/details/110494603