Apollo源码-长轮询客户端实现
前言
在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码
Apollo简介
fork 源码地址 apollo源码
参考apollo架构中心设计
主要分为 Config Service、Admin Service、Portal、Client 四部分
上文介绍到长轮询服务端实现,本文介绍客户端的长轮询实现
长轮询感知配置发布-客户端实现
从上文中我们知道,服务端暴露了一个将请求挂起60s的接口 /notifications/v2
,来供客户端感知服务端配置变化。上文我们只讲了服务端的源码实现,那本文来读一下客户端是如何从configservice读取配置的源码。
我们先定位到代码 RemoteConfigLongPollService#doLongPollingRefresh
感知配置变化,那何处调用了它,我们追踪到 RemoteConfigRepository
类的构造方法
/**
* Constructor.
*
* @param namespace the namespace
*/
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
//尝试同步配置
this.trySync();
//定时刷新配置
this.schedulePeriodicRefresh();
//长轮询刷新配置
this.scheduleLongPollingRefresh();
}
-
构造方法传递一个
namespace
参数,也就是说一个RemoteConfigRepository
对应一个namespace
-
schedulePeriodicRefresh
方法每5分钟调用一次trySync
方法而trySync方法内部逻辑在
RemoteConfigLongPollService#doLongPollingRefresh
也用到,我们待会看它时再解析,先接着看长轮询刷新配置方法this.scheduleLongPollingRefresh();
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
此时,将需要长轮询的 namespace
和 RemoteConfigRepository
传递给了 RemoteConfigLongPollService
实例:
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
//注册remoteConfigRepository到内存
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
//开始长轮询
startLongPolling();
}
return added;
}
- 注册
RemoteConfigRepository
到内存 - 开始长轮询
继续跟踪 startLongPolling
方法
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
//注册remoteConfigRepository到内存
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
//开始长轮询
startLongPolling();
}
return added;
}
private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
//读取appId
final String appId = m_configUtil.getAppId();
//读取cluster
final String cluster = m_configUtil.getCluster();
//读取dataCenter
final String dataCenter = m_configUtil.getDataCenter();
//默认2s
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
//多线程轮询
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
//睡眠2s,让线程消费更慢,可以让大多数任务可以在阻塞队列中去排队
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
//长轮询刷新
doLongPollingRefresh(appId, cluster, dataCenter);
}
});
} catch (Throwable ex) {
//回滚标识
m_longPollStarted.set(false);
//日志输出
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
-
使用
AtomicBoolean
原子类作为长轮询开始标识,若整个过程失败,会回滚该标识 -
从app.properties或系统参数中读取appId、cluster、dataCenter
-
多线程执行轮询,每2s睡眠一次,降低cpu压力
我们来看看多线程轮询的动作
doLongPollingRefresh
方法
private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
//new 一个随机数
final Random random = new Random();
//appName、instanceId、homepageUrl
ServiceDTO lastServiceDto = null;
//while 可以执行
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
//限流: 每秒2次许可,阻塞线程,5秒内仍获取不到许可,则返回false
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
//休眠5秒
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
if (lastServiceDto == null) {
//获取服务端配置
List<ServiceDTO> configServices = getConfigServices();
//随机赋值一个
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
代码分三段去看
第一段:
//获取服务端实例信息
List<ServiceDTO> configServices = getConfigServices();
//随机赋值一个
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
执行远端请求configservice的 ServiceController#getConfigService
接口,从Eureka中获取 apollo-configservice
实例信息,并随机一个赋值给ServiceDTO
第二段:
//返回长轮询接口NotificationControllerV2的url
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
参数详解:
lastServiceDto.getHomepageUrl()
: 获取 configservice
实例的绝对主页URL路径,为其他服务提供信息时使用的路径,默认为null。
m_notifications
: 在之前注册 remoteConfigRepository
到内存时初始化了,键为:namespace,值为: -1
第三段
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//设置超时时间90 * 1000
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
transaction.addData("Url", url);
//发送请求
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
//请求成功
if (response.getStatusCode() == 200 && response.getBody() != null) {
//修改m_notifications
updateNotifications(response.getBody());
//修改m_remoteNotificationMessages
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
//通知配置变化
notify(lastServiceDto, response.getBody());
}
//304:没有客户端关心的配置变化,再循环一次
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
关键性代码:
-
updateNotifications
方法,修改了之前m_notifications
以namespaceName为key,messageId为value -
updateRemoteNotifications
方法,修改了m_remoteNotificationMessages
以namespaceName为key, ApolloNotificationMessages对象,该对象的message属性是以key:appId+cluster+namespace ,value:messageIdnotify
方法我们重点拿出来看:
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
//遍历通知
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
//获取namespaceName对应的RemoteConfigRepository
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
//获取消息
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
//克隆一份
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
//因为.properties在默认情况下被过滤掉,所以我们需要检查是否有它的侦听器
//如果m_longPollNamespaces中存在.properties的RemoteConfigRepository,则加入集合
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
//遍历监听仓库集合
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
//调用onLongPollNotified方法
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}
参数详解:
lastServiceDto
: Eureka中configservice实例DTO
notifications
: 轮询configservice得到的更新配置集合
上述方法会遍历配置更新集合,获取到配置信息,根据namespace获取长轮询之前就注册到 RemoteConfigLongPollService
的 RemoteConfigRepository
,并回调它的 onLongPollNotified
方法,我们接着来看此方法
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
//保存当前serviceConfig实例
m_longPollServiceDto.set(longPollNotifiedServiceDto);
//保存当前消息
m_remoteMessages.set(remoteMessages);
//丢到线程池中执行
m_executorService.submit(new Runnable() {
@Override
public void run() {
//标志位
m_configNeedForceRefresh.set(true);
//尝试同步
trySync();
}
});
}
参数详解:
longPollNotifiedServiceDto
: 轮询请求到的configservice实例
remoteMessages
: 消息对象
看到熟悉的方法了 trySync
, 我们之前在看 RemoteConfigRepository
构造方法时,就会去调用一次该方法,方法内部直接调用的是子类的 sync
方法,我们直接看该方法代码实现
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
//从缓存中获取配置
ApolloConfig previous = m_configCache.get();
//远端请求configservice获取配置
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
//这里的判断是:如果服务端没有配置变化返回304,则current是从m_configCache.get()获取,如果两者不相等,证明有配置变化返回
if (previous != current) {
logger.debug("Remote Config refreshed!");
//将配置加入缓存
m_configCache.set(current);
//通知监听器,配置变更
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
-
读取configservice变更配置信息,并加入到缓存
-
通知监听器配置变更
loadApolloConfig
方法这里不再跟读了,大致内容是请求ConfigController#queryConfig
获取appId、cluster、namespace下的apollo配置
流程图总结如下 :
留两个问题:通知监听器后,它们干了些什么,configservice是通过怎样的推算知道配置没有发生变化
本文地址:https://blog.csdn.net/m0_37268363/article/details/110494603