欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Soul网关源码分析-14期

程序员文章站 2022-06-03 20:24:13
...



后台与网关数据同步 (Http长轮询篇 <二>)


总结下后台与网关的长轮询同步流程, 会将重点放在后台的处理上.

长轮询的流程总体分两个模块: 一是请求接入, 二是变更通知



后台请求接入


网关启动后, 会与后台建立连接来获得数据的同步, 而长轮询下网关建立连接的方式就是通过 http 请求后台.


下面展示下网关发送请求到后台时, 后台的处理流程:

接收请求
hold住请求
缓存请求
延迟60s后释放请求

这几个处理步骤被分散到下面这些类的方法协作中:

ConfigController.listener
HttpLongPollingDataChangedListener.doLongPolling
LongPollingClient.run

ConfigController#listener: 作为Controller 层的方法, 提供 /configs/listener 路径供网关调用.

@RestController
@RequestMapping("/configs")
public class ConfigController {
  
  @PostMapping(value = "/listener")
  public void listener(final HttpServletRequest request, final HttpServletResponse response) {
    longPollingListener.doLongPolling(request, response);
  }
}

HttpLongPollingDataChangedListener#doLongPolling: 比对数据是否发生变化, 有变化则返回响应给网关, 无变化则 hold 住请求 60s

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  
  public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

    // 比较数据是否更新的方法, 非常重要但这里先不分析, 会放到细节模块讲
    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
    String clientIp = getRemoteIp(request);

    // 有变化的信息就直接构造响应信息并返回
    if (CollectionUtils.isNotEmpty(changedGroup)) {
      this.generateResponse(response, changedGroup);
      log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
      return;
    }

    // 将请求转换为异步方式, 并且不限制超时时间, 这里就 hold 住请求
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);

    // 另起线程执行 LongPollingClient run方法
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
  }
}

LongPollingClient#run: 将这次的请求加入内存缓存 (一个 BlockingQueue 阻塞队列), 并启动一个延时线程, 做释放此次请求的工作.

class LongPollingClient implements Runnable {
  
  @Override
  public void run() {
    // 延时线程 60s 后执行
    this.asyncTimeoutFuture = scheduler.schedule(() -> {
      // 内存缓存中去除该对象
      clients.remove(LongPollingClient.this);
      // 得到变化的数据类型
      List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
      // 释放请求
      sendResponse(changedGroups);
    }, timeoutTime, TimeUnit.MILLISECONDS);
    // 自定义请求对象添加内存缓存
    clients.add(this);
  }
}




后台变更通知


后台数据变动时, 会有数据变动事件发出, 经过 spring 发布订阅的功能, 走到我们实现的订阅分发器 DataChangedEventDispatcher , 它会通知我们的长轮询监听器有数据变动.


下面展示下后台数据变动时长轮询的处理流程:

数据发生变动
分发到长轮询监听器
更新内存缓存
剔除维护的接入请求
通知数据变更及释放请求

对应的实际代码实现如下:

DataChangedEventDispatcher
AbstractDataChangedListener
HttpLongPollingDataChangedListener
DataChangeTask
LongPollingClient.sendResponse

DataChangedEventDispatcher: 处理数据信息变动并通知监听器.


AbstractDataChangedListener: 更新维护的数据信息缓存, 并调用子类需实现的 afterPluginChanged() 方法

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

	@Override
  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    if (CollectionUtils.isEmpty(changed)) {
      return;
    }
    this.updatePluginCache();
    this.afterPluginChanged(changed, eventType);
  }
}

HttpLongPollingDataChangedListener: 开启线程通知各个维护的请求, 并传入变动事件类型

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  
	@Override
  protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
  }
}

DataChangeTask: 循环所有持有的请求, 通知他们数据变动的类型信息, 并剔除维护的队列

class DataChangeTask implements Runnable {

	@Override
  public void run() {
    for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
      LongPollingClient client = iter.next();
      iter.remove();
      // 调用 LongPollingClient 的 sendResponse() 释放请求
      client.sendResponse(Collections.singletonList(groupKey));
      log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
    }
  }
}

LongPollingClient#sendResponse: 取消之前开启的延迟定时任务, 生成事件类型变更响应, 并释放请求.

class LongPollingClient implements Runnable {
  
  void sendResponse(final List<ConfigGroupEnum> changedGroups) {
    // 取消延迟任务, 对应 run() 方法的开启延迟任务
    if (null != asyncTimeoutFuture) {
      asyncTimeoutFuture.cancel(false);
    }
    // 生成对应事件类型变动的响应信息
    generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
    // 释放请求
    asyncContext.complete();
  }
}




后台细节


来分析 Http 长轮询的细节, 从上期的两个遗留问题开始.

  1. 数据怎样知道是有变化的, 是不是设置个最后更新时间, 与网关的请求时间比较, 得出是否有数据修改?
  2. 那些用于更新的数据放哪里, 用缓存的话, 考虑后台缓存与数据库的交互是怎样的.



更新数据如何鉴别


之前的流程总结中我们看到一个非常重要的方法 HttpLongPollingDataChangedListener#compareChangedGroup, 看看它的实现:

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
  // 变更事件枚举, 如 plugin、metadata、rule、selector 等
  List<ConfigGroupEnum> changedGroup = new ArrayList<>(ConfigGroupEnum.values().length);
  // 遍历所有系统定义的事件类型
  for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
    String[] params = StringUtils.split(request.getParameter(group.name()), ',');
    if (params == null || params.length != 2) {
      throw new SoulException("group param invalid:" + request.getParameter(group.name()));
    }
    // 针对某个事件类型(如plugin), 拿到网关数据的 MD5 值, 网关的数据最后更新时间, 以及后台缓存
    String clientMd5 = params[0];
    long clientModifyTime = NumberUtils.toLong(params[1]);
    ConfigDataCache serverCache = CACHE.get(group.name());
    // 检测是否有变化, 有则加入变化的事件类型集合中
    if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
      changedGroup.add(group);
    }
  }
  return changedGroup;
}

这里可以得知, 在鉴别是否有数据变化时, 会拿网关请求信息中的对应 每个数据类型 的 “MD5值” 和 “最后更新时间” , 与当前后台缓存中对应这些事件的缓存信息做比较. 返回的响应信息也仅是包含所有变化的数据类型.



网关得到变化数据类型后还要做什么


即使后台通知网关数据变化, 网关也只能从响应信息中, 获得变化的数据类型, 并不能直接获得最新数据信息, 所以网关还需主动向后台请求最新的信息.


可以在 ConfigController 这里找到后台提供给网关的获取信息的接口:

@RestController
@RequestMapping("/configs")
public class ConfigController {
  
  @GetMapping("/fetch")
  public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
    Map<String, ConfigData<?>> result = Maps.newHashMap();
    for (String groupKey : groupKeys) {
      // 调用对应数据类型的最新数据
      ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
      result.put(groupKey, data);
    }
    return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);
  }
}

网关传入要获取的数据类型, 后台就会从长轮询监听器中拿取对应数据, 看看 featchConfig() 方法的实现.

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {

	public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
    // 从缓存中获取对应数据类型的数据
    ConfigDataCache config = CACHE.get(groupKey.name());
    // 数据封装对象
    switch (groupKey) {
      case APP_AUTH:
        List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
        }.getType());
        return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
      case PLUGIN:
        List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
        }.getType());
        return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
      case RULE:
        List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
        }.getType());
        return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
      case SELECTOR:
        List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
        }.getType());
        return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
      case META_DATA:
        List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
        }.getType());
        return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
      default:
        throw new IllegalStateException("Unexpected groupKey: " + groupKey);
    }
  }
}



CACHE 缓存数据的来源


HttpLongPollingDataChangedListener 长轮询监听类有重写 afterInitialize() 方法, 它来自 spring 的 InitializingBean 接口, 在容器加载完 properties 配置表后会执行到.

@Override
protected void afterInitialize() {
  long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
  // 每次间隔5分钟的定时调用模式
  scheduler.scheduleWithFixedDelay(() -> {
    log.info("http sync strategy refresh config start.");
    try {
      // 刷新缓存 CACHE
      this.refreshLocalCache();
      log.info("http sync strategy refresh config success.");
    } catch (Exception e) {
      log.error("http sync strategy refresh config error!", e);
    }
  }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
  log.info("http sync strategy refresh interval: {}ms", syncInterval);
}

其中的 refreshLocalCache() 方法就是 CACHE 缓存的来源之一, 从数据库加载数据 (另一个来源就是上面我们分析的数据变动通知)

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  
  private void refreshLocalCache() {
    this.updateAppAuthCache();
    this.updatePluginCache();
    this.updateRuleCache();
    this.updateSelectorCache();
    this.updateMetaDataCache();
  }
}
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
  
  protected void updatePluginCache() {
    // pluginService.listAll() 从插件数据表中获得全部数据
    this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
  }
  
  protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
    String json = GsonUtils.getInstance().toJson(data);
    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
    // 刷新缓存
    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
    log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
  }
}

可以看到 HttpLongPollingDataChangedListener 重写了 afterInitialize() 方法, 将原先 AbstractDataChangedListener 仅启动时从数据库加载数据, 改为 5 分钟加载一次.


为什么要定时的加载呢? 暂时想到的可能性, 是集群的通知问题么… 这个疑问暂且保留.

相关标签: 网关 java