ReleaseMessageScanner:扫描变更配置,触发监听者们
程序员文章站
2022-07-12 11:59:15
...
6、ReleaseMessageScanner:扫描变更配置,触发监听者们
这一步就是将第5步插入数据库的变更配置,每秒扫描一次,扫出来之后发送给监听者(还记得NotificationControllerV2 implements ReleaseMessageListener)
public class ReleaseMessageScanner implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
@Autowired
private BizConfig bizConfig;
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
private int databaseScanInterval;
private List<ReleaseMessageListener> listeners;
private ScheduledExecutorService executorService;
private long maxIdScanned;
public ReleaseMessageScanner() {
listeners = Lists.newCopyOnWriteArrayList();
executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
.create("ReleaseMessageScanner", true));
}
@Override
public void afterPropertiesSet() throws Exception {
//定时任务频率,默认1000ms
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
//最大的ReleaseMessage编号
maxIdScanned = loadLargestMessageId();
//每秒走一次,扫描消息定时
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
scanMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan and send message failed", ex);
} finally {
transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
}
/**
* add message listeners for release message
* @param listener
*/
public void addMessageListener(ReleaseMessageListener listener) {
if (!listeners.contains(listener)) {
listeners.add(listener);
}
}
/**
* Scan messages, continue scanning until there is no more messages
*/
private void scanMessages() {
boolean hasMoreMessages = true;
while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
hasMoreMessages = scanAndSendMessages();
}
}
/**
* scan messages and send
* 扫描,触发监听
* @return 是否有新的Message可以扫描
*
* @return whether there are more messages
*/
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
//触发监听
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
//记录最新一条Id
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}
/**
* 获取最大ReleaseMessage编号
* find largest message id as the current start point
* @return current largest message id
*/
private long loadLargestMessageId() {
ReleaseMessage releaseMessage = releaseMessageRepository.findTopByOrderByIdDesc();
return releaseMessage == null ? 0 : releaseMessage.getId();
}
/**
* Notify listeners with messages loaded
* @param messages
*/
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
//将每条消息循环通知所有订阅者
for (ReleaseMessageListener listener : listeners) {
try {
//触发监听
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
总结:
1、启动一个定时任务(每秒一次)
2、根据最大编号查询数据库500条(按id倒叙)
3、设置最新的最大编号
4、触发监听者们--还记得下面这个监听吗com.ctrip.framework.apollo.configservice.ConfigServiceAutoConfiguration.MessageScannerConfiguration#releaseMessageScanner
/**
* 监听器注册,namespaces变更后接收消息
* @return
*/
@Bean
public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
//0. handle release message cache
releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
//1. handle gray release rule
releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
//2. handle server cache
releaseMessageScanner.addMessageListener(configService);
releaseMessageScanner.addMessageListener(configFileController);
//3. notify clients
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
}
}
上一篇: Apollo搭建踩过的那些坑
下一篇: 计算汉明权重