欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Apollo源码-配置发布通知客户端

程序员文章站 2021-11-23 10:50:19
前言在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码Apollo简介fork 源码地址 apollo源码参考apollo架构中心设计主要分为 Config Service、Admin Service、Portal、Client 四部分上文介绍到ReleaseMessage 对象的发布,portal发布配置第一件事新增 Release 对象,第二件事发布ReleaseMessage 紧接着第三...

前言

在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码

Apollo简介

fork 源码地址 apollo源码
参考apollo架构中心设计
主要分为 Config ServiceAdmin ServicePortalClient 四部分
上文介绍到ReleaseMessage 对象的发布,portal发布配置第一件事新增 Release 对象,第二件事发布ReleaseMessage 紧接着第三件事便是本文要讲的 ConfigPublishEvent 事件

新建ConfigPublishEvent事件

@EventListener
public void onConfigPublish(ConfigPublishEvent event) {
executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
}


private class ConfigPublishNotifyTask implements Runnable {

private ConfigPublishEvent.ConfigPublishInfo publishInfo;

ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
this.publishInfo = publishInfo;
}

@Override
public void run() {
ReleaseHistoryBO releaseHistory = getReleaseHistory();
if (releaseHistory == null) {
 Tracer.logError("Load release history failed", null);
 return;
}

sendPublishEmail(releaseHistory);

sendPublishMsg(releaseHistory);
}

跟踪代码也没发现和configservice的交互,事件监听者无非是创建一个线程池,执行线程任务,任务为发送release对象邮件,和调用远端hermes的一个接口。

配置发布,通知客户端

阅读官方文档 : 服务端设计文档中详细介绍了configservice是如何拉取ReleaseMessage的

实现方式如下:
1.Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender

2.Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
3.Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
4.NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端

如图 :
Apollo源码-配置发布通知客户端
那么我们跟着官方文档,来认识一下 ReleaseMessageScanner 这个类,该类实现了 InitializingBean 接口,会在容器启动,bean初始化后调用 afterPropertiesSet 方法

@Override
public void afterPropertiesSet() throws Exception {
//默认扫描间隔为1s
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
//查询最大的一条Release记录
maxIdScanned = loadLargestMessageId();
//延迟1s后执行定时任务,受任务影响,需要等任务完成之后才开始计时
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
 //扫描消息
 scanMessages();
 transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
 transaction.setStatus(ex);
 logger.error("Scan and send message failed", ex);
} finally {
 transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
//有新的message发布,通知configservice
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
//取最后一条数据的id,赋值最大id
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
//是否还有数据
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
 try {
   //通知所有消息监听器,触发handleMessage方法  
   listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
 } catch (Throwable ex) {
   Tracer.logError(ex);
   logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
 }
}
}
}

fireMessageScanned 会去通知所有的监听者,我们看看 ReleaseMessageListener 这个类图
Apollo源码-配置发布通知客户端
监听器的注册过程参见 ConfigServiceAutoConfiguration , handleMessage 方法是得到发布的配置并处理,根据官方文档指示 : Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器 ,客户端具体定位到 NotificationControllerV2#handleMessage

@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
//内容即为 : aapId + cluster + namespace
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
//如果channel不是apollo-release,则不处理
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
//retrieveNamespaceFromReleaseMessage实现了Function的lambda表达式,apply是通过该表达式返回namespace
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}

if (!deferredResults.containsKey(content)) {
return;
}

//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
//key : appId+cluster+namespace, value: messageId
configNotification.addMessage(content, message.getId());

//do async notification if too many clients
//客户端长轮询连接数 > 100
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
 logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
     bizConfig.releaseMessageNotificationBatch());
 for (int i = 0; i < results.size(); i++) {
   //100一个批次,就睡眠100ms
   if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
     try {
       //睡眠100ms
       TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
     } catch (InterruptedException e) {
       //ignore
     }
   }
   logger.debug("Async notify {}", results.get(i));
   //通知客户端,消息为:namespace,messageId
   results.get(i).setResult(configNotification);
 }
});
return;
}

logger.debug("Notify {} clients for key {}", results.size(), content);
//同步通知
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
  • 拉取到消息,通知到所有的ReleaseMessageListener实现类 ReleaseMessageScanner#fireMessageScanned ,会调用 handleMessage 方法。
  • content:appId+cluster+namespace 中取出 namespace
  • 组装 ApolloConfigNotification 对象 messageId、namespaceName、key : appId+cluster+namespace, value: messageId 的map 通知客户端。
  • 客户端使用 DeferredResult 长轮询技术。

本文地址:https://blog.csdn.net/m0_37268363/article/details/110493806