欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apollo配置中心**模块源码解读

程序员文章站 2022-07-12 12:49:09
...

2020年3月份,携程发布了apollo配置中心1.6.0版本,这个版本一大亮点就是增加了**的验证以及管理功能,也就是说客户端必须用**对http请求签名才可以访问配置信息,这样一来,不但提高了配置中心的安全性,也让配置中心部署到公共环境成为可能。

**模块主要包含2大功能:

  1. 通过管理平台以环境为单位设置不超过5个**
  2. 客户端用**对http请求签名从而可以访问并获取配置,如果非法返回错误代码

接下来将通过对**模块的主要的源码进行解读,进一步了解该模块的业务逻辑

  1. AccessKeyServiceWithCache.java
    实现了InitializingBean接口,**的缓存实现类,通过将全部的**缓存到内存中,从而提高查询效率。
@Service  
public class AccessKeyServiceWithCache implements InitializingBean {  
  
  private static Logger logger = LoggerFactory.getLogger(AccessKeyServiceWithCache.class);  
  //**仓库
  private final AccessKeyRepository accessKeyRepository;  
  private final BizConfig bizConfig;  
  //扫描间隔
  private int scanInterval;  
  //扫描间隔单位
  private TimeUnit scanIntervalTimeUnit;  
  //清除间隔
  private int rebuildInterval;  
  //清除间隔单位
  private TimeUnit rebuildIntervalTimeUnit;  
  private ScheduledExecutorService scheduledExecutorService;  
  private Date lastTimeScanned;  
  
  private ListMultimap<String, AccessKey> accessKeyCache;  
  private ConcurrentMap<Long, AccessKey> accessKeyIdCache;  
  
  @Autowired  
  public AccessKeyServiceWithCache(AccessKeyRepository accessKeyRepository, BizConfig bizConfig) {  
    this.accessKeyRepository = accessKeyRepository;  
    this.bizConfig = bizConfig;  
  
    initialize();  
  }  
  //初始化方法
  private void initialize() {  
    //这里定义了一个延迟线程池
    scheduledExecutorService = new ScheduledThreadPoolExecutor(1,  
        ApolloThreadFactory.create("AccessKeyServiceWithCache", true));  
    lastTimeScanned = new Date(0L);  
  
    ListMultimap<String, AccessKey> multimap = ListMultimapBuilder.hashKeys(128)  
        .arrayListValues().build();  
    accessKeyCache = Multimaps.synchronizedListMultimap(multimap);  
    accessKeyIdCache = Maps.newConcurrentMap();  
  }  
  //找出应用的有效**
  public List<String> getAvailableSecrets(String appId) {  
    List<AccessKey> accessKeys = accessKeyCache.get(appId);  
    if (CollectionUtils.isEmpty(accessKeys)) {  
      return Collections.emptyList();  
    }  
  
    return accessKeys.stream()  
        .filter(AccessKey::isEnabled)  
        .map(AccessKey::getSecret)  
        .collect(Collectors.toList());  
  }  
  
  @Override  
  public void afterPropertiesSet() throws Exception {  
    //初始化参数
    populateDataBaseInterval();  
    //此方法的作用是阻止启动进程直到加载完成
    scanNewAndUpdatedAccessKeys();
    //间隔执行任务与scheduleAtFixedRate区别是任务执行完开始计算间隔时间
    scheduledExecutorService.scheduleWithFixedDelay(this::scanNewAndUpdatedAccessKeys,  
        scanInterval, scanInterval, scanIntervalTimeUnit);  
    //间隔执行任务
    scheduledExecutorService.scheduleAtFixedRate(this::rebuildAccessKeyCache,  
        rebuildInterval, rebuildInterval, rebuildIntervalTimeUnit);  
  }  
  
  //扫描是否有新的**,如果有则更新到缓存中
  private void scanNewAndUpdatedAccessKeys() {  
    Transaction transaction = Tracer.newTransaction("Apollo.AccessKeyServiceWithCache",  
        "scanNewAndUpdatedAccessKeys");  
    try { 
      //扫描是否有新的**,如果有则更新到缓存中 
      loadNewAndUpdatedAccessKeys();  
      transaction.setStatus(Transaction.SUCCESS);  
    } catch (Throwable ex) {  
      transaction.setStatus(ex);  
      logger.error("Load new/updated app access keys failed", ex);  
    } finally {  
      transaction.complete();  
    }  
  }  
  //清除缓存中所有的**
  private void rebuildAccessKeyCache() {  
    Transaction transaction = Tracer.newTransaction("Apollo.AccessKeyServiceWithCache",  
        "rebuildCache");  
    try {  
      deleteAccessKeyCache();  
      transaction.setStatus(Transaction.SUCCESS);  
    } catch (Throwable ex) {  
      transaction.setStatus(ex);  
      logger.error("Rebuild cache failed", ex);  
    } finally {  
      transaction.complete();  
    }  
  }  
  //扫描是否有新的**,如果有则更新到缓存中
  private void loadNewAndUpdatedAccessKeys() {  
    boolean hasMore = true;  
    while (hasMore && !Thread.currentThread().isInterrupted()) {  
      //最后一次修改时间大于当前传入时间的所有**,并按最后修改时间升序排列,然后取前500条
      List<AccessKey> accessKeys = accessKeyRepository  
          .findFirst500ByDataChangeLastModifiedTimeGreaterThanOrderByDataChangeLastModifiedTimeAsc(lastTimeScanned);  
      if (CollectionUtils.isEmpty(accessKeys)) {  
        break;  
      }  
  
      int scanned = accessKeys.size();  
      //查询出的**列表并入**缓存中
      mergeAccessKeys(accessKeys);  
      logger.info("Loaded {} new/updated Accesskey from startTime {}", scanned, lastTimeScanned);  
      //如果列表数量等于500,可能任然存在符合条件的数据
      hasMore = scanned == 500;  
      lastTimeScanned = accessKeys.get(scanned - 1).getDataChangeLastModifiedTime();  
      //为了避免最后时刻丢失一些数据,我们需要在此时扫描记录
      if (hasMore) {  
        List<AccessKey> lastModifiedTimeAccessKeys = accessKeyRepository.findByDataChangeLastModifiedTime(lastTimeScanned);  
        mergeAccessKeys(lastModifiedTimeAccessKeys);  
        logger.info("Loaded {} new/updated Accesskey at lastModifiedTime {}", scanned, lastTimeScanned);  
      }  
    }  
  }  
  //查询出的**列表并入**缓存中
  private void mergeAccessKeys(List<AccessKey> accessKeys) {  
    for (AccessKey accessKey : accessKeys) {  
      AccessKey thatInCache = accessKeyIdCache.get(accessKey.getId());  
  
      accessKeyIdCache.put(accessKey.getId(), accessKey);  
      accessKeyCache.put(accessKey.getAppId(), accessKey);  
  
      if (thatInCache != null && accessKey.getDataChangeLastModifiedTime()  
          .after(thatInCache.getDataChangeLastModifiedTime())) {  
        accessKeyCache.remove(accessKey.getAppId(), thatInCache);  
        logger.info("Found Accesskey changes, old: {}, new: {}", thatInCache, accessKey);  
      }  
    }  
  }  
  //清除缓存中所有的**
  private void deleteAccessKeyCache() {  
    List<Long> ids = Lists.newArrayList(accessKeyIdCache.keySet());  
    if (CollectionUtils.isEmpty(ids)) {  
      return;  
    }  
  
    List<List<Long>> partitionIds = Lists.partition(ids, 500);  
    for (List<Long> toRebuildIds : partitionIds) {  
      Iterable<AccessKey> accessKeys = accessKeyRepository.findAllById(toRebuildIds);  
  
      Set<Long> foundIds = Sets.newHashSet();  
      for (AccessKey accessKey : accessKeys) {  
        foundIds.add(accessKey.getId());  
      }  
  
      SetView<Long> deletedIds = Sets.difference(Sets.newHashSet(toRebuildIds), foundIds);  
      handleDeletedAccessKeys(deletedIds);  
    }  
  }  
  
  private void handleDeletedAccessKeys(Set<Long> deletedIds) {  
    if (CollectionUtils.isEmpty(deletedIds)) {  
      return;  
    }  
    for (Long deletedId : deletedIds) {  
      AccessKey deleted = accessKeyIdCache.remove(deletedId);  
      if (deleted == null) {  
        continue;  
      }  
  
      accessKeyCache.remove(deleted.getAppId(), deleted);  
      logger.info("Found AccessKey deleted, {}", deleted);  
    }  
  }  
  //初始化参数
  private void populateDataBaseInterval() {  
    //扫描间隔
    scanInterval = bizConfig.accessKeyCacheScanInterval();  
    //扫描间隔单位
    scanIntervalTimeUnit = bizConfig.accessKeyCacheScanIntervalTimeUnit(); 
    //清空**间隔
    rebuildInterval = bizConfig.accessKeyCacheRebuildInterval(); 
     //清空**间隔单位
    rebuildIntervalTimeUnit = bizConfig.accessKeyCacheRebuildIntervalTimeUnit();  
  }  
}  

  1. AccessKeyUtil.java
    **工具类。
@Component  
public class AccessKeyUtil {  
  
  private static final String URL_SEPARATOR = "/";  
  private static final String URL_CONFIGS_PREFIX = "/configs/";  
  private static final String URL_CONFIGFILES_JSON_PREFIX = "/configfiles/json/";  
  private static final String URL_CONFIGFILES_PREFIX = "/configfiles/";  
  private static final String URL_NOTIFICATIONS_PREFIX = "/notifications/v2";  
  
  private final AccessKeyServiceWithCache accessKeyServiceWithCache;  
  
  public AccessKeyUtil(AccessKeyServiceWithCache accessKeyServiceWithCache) {  
    this.accessKeyServiceWithCache = accessKeyServiceWithCache;  
  }  
  //找出应用的有效**
  public List<String> findAvailableSecret(String appId) {  
    return accessKeyServiceWithCache.getAvailableSecrets(appId);  
  }  
  //从请求数据中心查找出appid 
  public String extractAppIdFromRequest(HttpServletRequest request) {  
    String appId = null;  
    String servletPath = request.getServletPath();  
  
    if (StringUtils.startsWith(servletPath, URL_CONFIGS_PREFIX)) {  
      appId = StringUtils.substringBetween(servletPath, URL_CONFIGS_PREFIX, URL_SEPARATOR);  
    } else if (StringUtils.startsWith(servletPath, URL_CONFIGFILES_JSON_PREFIX)) {  
      appId = StringUtils.substringBetween(servletPath, URL_CONFIGFILES_JSON_PREFIX, URL_SEPARATOR);  
    } else if (StringUtils.startsWith(servletPath, URL_CONFIGFILES_PREFIX)) {  
      appId = StringUtils.substringBetween(servletPath, URL_CONFIGFILES_PREFIX, URL_SEPARATOR);  
    } else if (StringUtils.startsWith(servletPath, URL_NOTIFICATIONS_PREFIX)) {  
      appId = request.getParameter("appId");  
    }  
  
    return appId;  
  }  
  
  public String buildSignature(String path, String query, String timestampString, String secret) {  
    String pathWithQuery = path;  
    if (!Strings.isNullOrEmpty(query)) {  
      pathWithQuery += "?" + query;  
    }  
  
    return Signature.signature(timestampString, pathWithQuery, secret);  
  }  
} 

  1. ClientAuthenticationFilter.java
    实现了filter接口,用于拦截验证客户端**。
public class ClientAuthenticationFilter implements Filter {  
  
  private static final Logger logger = LoggerFactory.getLogger(ClientAuthenticationFilter.class);  
  
  private static final Long TIMESTAMP_INTERVAL = 60 * 1000L;  
  
  private final AccessKeyUtil accessKeyUtil;  
  
  public ClientAuthenticationFilter(AccessKeyUtil accessKeyUtil) {  
    this.accessKeyUtil = accessKeyUtil;  
  }  
  
  @Override  
  public void init(FilterConfig filterConfig) throws ServletException {  
    //nothing  
  }  
  
  @Override  
  public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)  
      throws IOException, ServletException {  
    HttpServletRequest request = (HttpServletRequest) req;  
    HttpServletResponse response = (HttpServletResponse) resp;  
  
    String appId = accessKeyUtil.extractAppIdFromRequest(request);  
    if (StringUtils.isBlank(appId)) {  
      response.sendError(HttpServletResponse.SC_BAD_REQUEST, "InvalidAppId");  
      return;  
    }  
    //找出应用的有效**
    List<String> availableSecrets = accessKeyUtil.findAvailableSecret(appId);  
    if (!CollectionUtils.isEmpty(availableSecrets)) {  
      String timestamp = request.getHeader(Signature.HTTP_HEADER_TIMESTAMP);  
      String authorization = request.getHeader(Signature.HTTP_HEADER_AUTHORIZATION);  
  
      // 必须在一分钟内有效  
      if (!checkTimestamp(timestamp)) {  
        logger.warn("Invalid timestamp. appId={},timestamp={}", appId, timestamp);  
        response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "RequestTimeTooSkewed");  
        return;  
      }  
  
      // 身份校验,验证失败返回
      String path = request.getServletPath();  
      String query = request.getQueryString();  
      if (!checkAuthorization(authorization, availableSecrets, timestamp, path, query)) {  
        logger.warn("Invalid authorization. appId={},authorization={}", appId, authorization);  
        response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Unauthorized");  
        return;  
      }  
    }  
  
    chain.doFilter(request, response);  
  }  
  
  @Override  
  public void destroy() {  
    //nothing  
  }  
  
  private boolean checkTimestamp(String timestamp) {  
    long requestTimeMillis = 0L;  
    try {  
      requestTimeMillis = Long.parseLong(timestamp);  
    } catch (NumberFormatException e) {  
      // nothing to do  
    }  
  
    long x = System.currentTimeMillis() - requestTimeMillis;  
    return x <= TIMESTAMP_INTERVAL;  
  }  
  
  private boolean checkAuthorization(String authorization, List<String> availableSecrets,  
      String timestamp, String path, String query) {  
  
    String signature = null;  
    if (authorization != null) {  
      String[] split = authorization.split(":");  
      if (split.length > 1) {  
        signature = split[1];  
      }  
    }  
  
    for (String secret : availableSecrets) {  
      String availableSignature = accessKeyUtil.buildSignature(path, query, timestamp, secret);  
      if (Objects.equals(signature, availableSignature)) {  
        return true;  
      }  
    }  
    return false;  
  }  
}  

通过对代码进一步的解析,也大致了解了**模块的功能逻辑,但是总体来说还是存在一些不足:

  1. 设置**的对象粒度过大,目前是对于整个环境设计**,相反,如果能针对更小颗粒的对象比如应用,集群设计**,灵活性将进一步提高。
  2. **管理页面太粗糙,比如无法查询正在使用**的应用等。

当然这也只是第一版,相信携程会在以后的版本中优化的越来越好。

相关标签: java spring

上一篇: rabbitmq安装

下一篇: RabbitMQ安装