Apollo配置中心**模块源码解读
程序员文章站
2022-07-12 12:49:09
...
2020年3月份,携程发布了apollo配置中心1.6.0版本,这个版本一大亮点就是增加了**的验证以及管理功能,也就是说客户端必须用**对http请求签名才可以访问配置信息,这样一来,不但提高了配置中心的安全性,也让配置中心部署到公共环境成为可能。
**模块主要包含2大功能:
- 通过管理平台以环境为单位设置不超过5个**
- 客户端用**对http请求签名从而可以访问并获取配置,如果非法返回错误代码
接下来将通过对**模块的主要的源码进行解读,进一步了解该模块的业务逻辑
- AccessKeyServiceWithCache.java
实现了InitializingBean接口,**的缓存实现类,通过将全部的**缓存到内存中,从而提高查询效率。
@Service
public class AccessKeyServiceWithCache implements InitializingBean {
private static Logger logger = LoggerFactory.getLogger(AccessKeyServiceWithCache.class);
//**仓库
private final AccessKeyRepository accessKeyRepository;
private final BizConfig bizConfig;
//扫描间隔
private int scanInterval;
//扫描间隔单位
private TimeUnit scanIntervalTimeUnit;
//清除间隔
private int rebuildInterval;
//清除间隔单位
private TimeUnit rebuildIntervalTimeUnit;
private ScheduledExecutorService scheduledExecutorService;
private Date lastTimeScanned;
private ListMultimap<String, AccessKey> accessKeyCache;
private ConcurrentMap<Long, AccessKey> accessKeyIdCache;
@Autowired
public AccessKeyServiceWithCache(AccessKeyRepository accessKeyRepository, BizConfig bizConfig) {
this.accessKeyRepository = accessKeyRepository;
this.bizConfig = bizConfig;
initialize();
}
//初始化方法
private void initialize() {
//这里定义了一个延迟线程池
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
ApolloThreadFactory.create("AccessKeyServiceWithCache", true));
lastTimeScanned = new Date(0L);
ListMultimap<String, AccessKey> multimap = ListMultimapBuilder.hashKeys(128)
.arrayListValues().build();
accessKeyCache = Multimaps.synchronizedListMultimap(multimap);
accessKeyIdCache = Maps.newConcurrentMap();
}
//找出应用的有效**
public List<String> getAvailableSecrets(String appId) {
List<AccessKey> accessKeys = accessKeyCache.get(appId);
if (CollectionUtils.isEmpty(accessKeys)) {
return Collections.emptyList();
}
return accessKeys.stream()
.filter(AccessKey::isEnabled)
.map(AccessKey::getSecret)
.collect(Collectors.toList());
}
@Override
public void afterPropertiesSet() throws Exception {
//初始化参数
populateDataBaseInterval();
//此方法的作用是阻止启动进程直到加载完成
scanNewAndUpdatedAccessKeys();
//间隔执行任务与scheduleAtFixedRate区别是任务执行完开始计算间隔时间
scheduledExecutorService.scheduleWithFixedDelay(this::scanNewAndUpdatedAccessKeys,
scanInterval, scanInterval, scanIntervalTimeUnit);
//间隔执行任务
scheduledExecutorService.scheduleAtFixedRate(this::rebuildAccessKeyCache,
rebuildInterval, rebuildInterval, rebuildIntervalTimeUnit);
}
//扫描是否有新的**,如果有则更新到缓存中
private void scanNewAndUpdatedAccessKeys() {
Transaction transaction = Tracer.newTransaction("Apollo.AccessKeyServiceWithCache",
"scanNewAndUpdatedAccessKeys");
try {
//扫描是否有新的**,如果有则更新到缓存中
loadNewAndUpdatedAccessKeys();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Load new/updated app access keys failed", ex);
} finally {
transaction.complete();
}
}
//清除缓存中所有的**
private void rebuildAccessKeyCache() {
Transaction transaction = Tracer.newTransaction("Apollo.AccessKeyServiceWithCache",
"rebuildCache");
try {
deleteAccessKeyCache();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Rebuild cache failed", ex);
} finally {
transaction.complete();
}
}
//扫描是否有新的**,如果有则更新到缓存中
private void loadNewAndUpdatedAccessKeys() {
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
//最后一次修改时间大于当前传入时间的所有**,并按最后修改时间升序排列,然后取前500条
List<AccessKey> accessKeys = accessKeyRepository
.findFirst500ByDataChangeLastModifiedTimeGreaterThanOrderByDataChangeLastModifiedTimeAsc(lastTimeScanned);
if (CollectionUtils.isEmpty(accessKeys)) {
break;
}
int scanned = accessKeys.size();
//查询出的**列表并入**缓存中
mergeAccessKeys(accessKeys);
logger.info("Loaded {} new/updated Accesskey from startTime {}", scanned, lastTimeScanned);
//如果列表数量等于500,可能任然存在符合条件的数据
hasMore = scanned == 500;
lastTimeScanned = accessKeys.get(scanned - 1).getDataChangeLastModifiedTime();
//为了避免最后时刻丢失一些数据,我们需要在此时扫描记录
if (hasMore) {
List<AccessKey> lastModifiedTimeAccessKeys = accessKeyRepository.findByDataChangeLastModifiedTime(lastTimeScanned);
mergeAccessKeys(lastModifiedTimeAccessKeys);
logger.info("Loaded {} new/updated Accesskey at lastModifiedTime {}", scanned, lastTimeScanned);
}
}
}
//查询出的**列表并入**缓存中
private void mergeAccessKeys(List<AccessKey> accessKeys) {
for (AccessKey accessKey : accessKeys) {
AccessKey thatInCache = accessKeyIdCache.get(accessKey.getId());
accessKeyIdCache.put(accessKey.getId(), accessKey);
accessKeyCache.put(accessKey.getAppId(), accessKey);
if (thatInCache != null && accessKey.getDataChangeLastModifiedTime()
.after(thatInCache.getDataChangeLastModifiedTime())) {
accessKeyCache.remove(accessKey.getAppId(), thatInCache);
logger.info("Found Accesskey changes, old: {}, new: {}", thatInCache, accessKey);
}
}
}
//清除缓存中所有的**
private void deleteAccessKeyCache() {
List<Long> ids = Lists.newArrayList(accessKeyIdCache.keySet());
if (CollectionUtils.isEmpty(ids)) {
return;
}
List<List<Long>> partitionIds = Lists.partition(ids, 500);
for (List<Long> toRebuildIds : partitionIds) {
Iterable<AccessKey> accessKeys = accessKeyRepository.findAllById(toRebuildIds);
Set<Long> foundIds = Sets.newHashSet();
for (AccessKey accessKey : accessKeys) {
foundIds.add(accessKey.getId());
}
SetView<Long> deletedIds = Sets.difference(Sets.newHashSet(toRebuildIds), foundIds);
handleDeletedAccessKeys(deletedIds);
}
}
private void handleDeletedAccessKeys(Set<Long> deletedIds) {
if (CollectionUtils.isEmpty(deletedIds)) {
return;
}
for (Long deletedId : deletedIds) {
AccessKey deleted = accessKeyIdCache.remove(deletedId);
if (deleted == null) {
continue;
}
accessKeyCache.remove(deleted.getAppId(), deleted);
logger.info("Found AccessKey deleted, {}", deleted);
}
}
//初始化参数
private void populateDataBaseInterval() {
//扫描间隔
scanInterval = bizConfig.accessKeyCacheScanInterval();
//扫描间隔单位
scanIntervalTimeUnit = bizConfig.accessKeyCacheScanIntervalTimeUnit();
//清空**间隔
rebuildInterval = bizConfig.accessKeyCacheRebuildInterval();
//清空**间隔单位
rebuildIntervalTimeUnit = bizConfig.accessKeyCacheRebuildIntervalTimeUnit();
}
}
- AccessKeyUtil.java
**工具类。
@Component
public class AccessKeyUtil {
private static final String URL_SEPARATOR = "/";
private static final String URL_CONFIGS_PREFIX = "/configs/";
private static final String URL_CONFIGFILES_JSON_PREFIX = "/configfiles/json/";
private static final String URL_CONFIGFILES_PREFIX = "/configfiles/";
private static final String URL_NOTIFICATIONS_PREFIX = "/notifications/v2";
private final AccessKeyServiceWithCache accessKeyServiceWithCache;
public AccessKeyUtil(AccessKeyServiceWithCache accessKeyServiceWithCache) {
this.accessKeyServiceWithCache = accessKeyServiceWithCache;
}
//找出应用的有效**
public List<String> findAvailableSecret(String appId) {
return accessKeyServiceWithCache.getAvailableSecrets(appId);
}
//从请求数据中心查找出appid
public String extractAppIdFromRequest(HttpServletRequest request) {
String appId = null;
String servletPath = request.getServletPath();
if (StringUtils.startsWith(servletPath, URL_CONFIGS_PREFIX)) {
appId = StringUtils.substringBetween(servletPath, URL_CONFIGS_PREFIX, URL_SEPARATOR);
} else if (StringUtils.startsWith(servletPath, URL_CONFIGFILES_JSON_PREFIX)) {
appId = StringUtils.substringBetween(servletPath, URL_CONFIGFILES_JSON_PREFIX, URL_SEPARATOR);
} else if (StringUtils.startsWith(servletPath, URL_CONFIGFILES_PREFIX)) {
appId = StringUtils.substringBetween(servletPath, URL_CONFIGFILES_PREFIX, URL_SEPARATOR);
} else if (StringUtils.startsWith(servletPath, URL_NOTIFICATIONS_PREFIX)) {
appId = request.getParameter("appId");
}
return appId;
}
public String buildSignature(String path, String query, String timestampString, String secret) {
String pathWithQuery = path;
if (!Strings.isNullOrEmpty(query)) {
pathWithQuery += "?" + query;
}
return Signature.signature(timestampString, pathWithQuery, secret);
}
}
- ClientAuthenticationFilter.java
实现了filter接口,用于拦截验证客户端**。
public class ClientAuthenticationFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(ClientAuthenticationFilter.class);
private static final Long TIMESTAMP_INTERVAL = 60 * 1000L;
private final AccessKeyUtil accessKeyUtil;
public ClientAuthenticationFilter(AccessKeyUtil accessKeyUtil) {
this.accessKeyUtil = accessKeyUtil;
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
//nothing
}
@Override
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) resp;
String appId = accessKeyUtil.extractAppIdFromRequest(request);
if (StringUtils.isBlank(appId)) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "InvalidAppId");
return;
}
//找出应用的有效**
List<String> availableSecrets = accessKeyUtil.findAvailableSecret(appId);
if (!CollectionUtils.isEmpty(availableSecrets)) {
String timestamp = request.getHeader(Signature.HTTP_HEADER_TIMESTAMP);
String authorization = request.getHeader(Signature.HTTP_HEADER_AUTHORIZATION);
// 必须在一分钟内有效
if (!checkTimestamp(timestamp)) {
logger.warn("Invalid timestamp. appId={},timestamp={}", appId, timestamp);
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "RequestTimeTooSkewed");
return;
}
// 身份校验,验证失败返回
String path = request.getServletPath();
String query = request.getQueryString();
if (!checkAuthorization(authorization, availableSecrets, timestamp, path, query)) {
logger.warn("Invalid authorization. appId={},authorization={}", appId, authorization);
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Unauthorized");
return;
}
}
chain.doFilter(request, response);
}
@Override
public void destroy() {
//nothing
}
private boolean checkTimestamp(String timestamp) {
long requestTimeMillis = 0L;
try {
requestTimeMillis = Long.parseLong(timestamp);
} catch (NumberFormatException e) {
// nothing to do
}
long x = System.currentTimeMillis() - requestTimeMillis;
return x <= TIMESTAMP_INTERVAL;
}
private boolean checkAuthorization(String authorization, List<String> availableSecrets,
String timestamp, String path, String query) {
String signature = null;
if (authorization != null) {
String[] split = authorization.split(":");
if (split.length > 1) {
signature = split[1];
}
}
for (String secret : availableSecrets) {
String availableSignature = accessKeyUtil.buildSignature(path, query, timestamp, secret);
if (Objects.equals(signature, availableSignature)) {
return true;
}
}
return false;
}
}
通过对代码进一步的解析,也大致了解了**模块的功能逻辑,但是总体来说还是存在一些不足:
- 设置**的对象粒度过大,目前是对于整个环境设计**,相反,如果能针对更小颗粒的对象比如应用,集群设计**,灵活性将进一步提高。
- **管理页面太粗糙,比如无法查询正在使用**的应用等。
当然这也只是第一版,相信携程会在以后的版本中优化的越来越好。
上一篇: rabbitmq安装
下一篇: RabbitMQ安装