欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Soul网关源码分析-2期

程序员文章站 2022-06-03 20:26:51
...



今日任务


  1. 沿着 DividePlugin 下去, 分析 WebClientPlugin 转发请求的环节, 以及 WebClientResponsePlugin 响应外发.
  2. 分析 Selector 的用处, 选择器如何工作. (明日任务)
  3. 简单分析下插件链上的其他内置插件. (明日任务, 结合Dubbo服务分析)


开始一个请求调用


WebClientPluginexecute() 处拦截并分析:

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
  assert soulContext != null;
  // 拿到下游服务的url
  String urlPath = exchange.getAttribute(Constants.HTTP_URL);
  if (StringUtils.isEmpty(urlPath)) {
    Object error = SoulResultWarp.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
    return WebFluxResultUtils.result(exchange, error);
  }
  long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
  log.info("you request,The resulting urlPath is :{}", urlPath);
  // 请求类型: Get请求orPost请求等
  HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
  // 构建一个请求对象空壳, 注入请求类型和URL
  WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
  return handleRequestBody(requestBodySpec, exchange, timeout, chain);
}

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final SoulPluginChain chain) {
  return requestBodySpec.headers(httpHeaders -> {
    // 补充上下文中请求头... 后面也是补充些属性, 不赘述
    httpHeaders.addAll(exchange.getRequest().getHeaders());
    httpHeaders.remove(HttpHeaders.HOST);
  })
    .contentType(buildMediaType(exchange))
    .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
    // 这里做了个转换, 将对象转换为Mono, 此方法向下深入已经看不明白了...
    .exchange()
    .doOnError(e -> log.error(e.getMessage()))
    .timeout(Duration.ofMillis(timeout))
    .flatMap(e -> doNext(e, exchange, chain));
}

这里并不是终点, 仅仅是构造了发送Http所需的对象并返回(这里我理解错误了, 下面有分析), 继续追溯, 找到是 SoulWebHandler 调用的插件链:

@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
  MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
  Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
  // 这里的plugins包含所有插件链的插件
  return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
    .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}

可以debug到, 所有的插件: GlobalPluginWafPluginRateLimiterHystrixPluginDividePluginWebClientPluginWebSocketPluginMonitorPluginWebClientResponsePlugin

继续向上追溯了许久, 穿梭在spring-webflux中各种类间… 暂时放弃探索这块…

回到WebClientPlugin调用后的WebClientResponsePlugin上:

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  return chain.execute(exchange).then(Mono.defer(() -> {
    // 获取上下文中存放的响应信息
    ServerHttpResponse response = exchange.getResponse();
    ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
    if (Objects.isNull(clientResponse)
        || response.getStatusCode() == HttpStatus.BAD_GATEWAY
        || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
      Object error = SoulResultWarp.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
      return WebFluxResultUtils.result(exchange, error);
    } else if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
      Object error = SoulResultWarp.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
      return WebFluxResultUtils.result(exchange, error);
    }
    // 各种拼装
    response.setStatusCode(clientResponse.statusCode());
    response.getCookies().putAll(clientResponse.cookies());
    response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
  }));
}

调用过程中, 有个很疑惑的点, WebClientPlugin 的请求, 到 WebClientResponsePlugin 的响应, 中间经历了什么? 一开始看的9个插件依次被调用, 但 WebClientPluginWebClientResponsePlugin 肯定是不同线程的工作, 那切换线程的转折点是哪里?

抱着这种想法, 在接口 SoulPlugin 这打了断点, 观察9个插件工作时的线程. 在 WebClientPlugin 结束后, MonitorPlugin 直接使用了另外的线程, WebClientResponsePlugin 沿用这个线程.

再次仔细查看发现, 之前的插件类, 仅是使用其父类 AbstractSoulPluginexecute() , 做插件链的下个执行. 而WebClientPlugin 这里, 则重写了execute() , 其用意也很明显, 不能继续直接用当前线程传递下去, 不然请求还没发出去, 就到了 WebClientResponsePlugin 了… 所以这里是发起了Http的调用并异步接收回调结果, 并继续执行插件链.

@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  // ... 之前已分析, 这里省略代码
  return handleRequestBody(requestBodySpec, exchange, timeout, chain);
}

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final SoulPluginChain chain) {
  return requestBodySpec.headers(httpHeaders -> {
    httpHeaders.addAll(exchange.getRequest().getHeaders());
    httpHeaders.remove(HttpHeaders.HOST);
  })
    .contentType(buildMediaType(exchange))
    .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
    // 这里的exchange不仅是转换返回对象, 也真正的httpp调用了下游服务, 具体分析在后面些
    .exchange()
    .doOnError(e -> log.error(e.getMessage()))
    .timeout(Duration.ofMillis(timeout))
    .flatMap(e -> doNext(e, exchange, chain));

}

// 这个方法里, 将传入的响应信息 ClientResponse 放入上下文中, 并继续完成剩下的插件链调用
// 这里其实已经是异步的回调方法了, 在另一个线程中工作
private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
  if (res.statusCode().is2xxSuccessful()) {
    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
  } else {
    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
  }
  exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
  return chain.execute(exchange);
}

分析到这里就比较通透了, 9个插件组成的链, 在 WebClientPlugin 这里开始分叉, 异步回调后面剩余的两个插件, 由最后的插件 WebClientResponsePlugin 组装响应对象.

简单看下 .exchange() 这个方法的实现, 这里有关键的Http调用:

@Override
public Mono<ClientResponse> exchange() {
  ClientRequest request = (this.inserter != null ?
                           initRequestBuilder().body(this.inserter).build() :
                           initRequestBuilder().build());
  // 这里是关键调用, 会走到spring-web-reactive里
  return Mono.defer(() -> exchangeFunction.exchange(request)
                    .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
                    .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}

追溯到 org.springframework.web.reactive.function.client.ExchangeFunction 下:

@Override
public Mono<ClientResponse> exchange(ClientRequest clientRequest) {
  Assert.notNull(clientRequest, "ClientRequest must not be null");
  HttpMethod httpMethod = clientRequest.method();
  URI url = clientRequest.url();
  String logPrefix = clientRequest.logPrefix();

  return this.connector
    .connect(httpMethod, url, httpRequest -> clientRequest.writeTo(httpRequest, this.strategies))
    .doOnRequest(n -> logRequest(clientRequest))
    .doOnCancel(() -> logger.debug(logPrefix + "Cancel signal (to close connection)"))
    .map(httpResponse -> {
      logResponse(httpResponse, logPrefix);
      return new DefaultClientResponse(
        httpResponse, this.strategies, logPrefix, httpMethod.name() + " " + url,
        () -> createRequest(clientRequest));
    });
}

这里就是请求发送, 并接受响应的地方了, 几个方法都比较容易看懂, 不做过多分析了.



总结


结合这两天的分析, 一个请求在网关内的流向, 有3个关键点:

  1. DividePlugin 的服务获取, 以及负载均衡选择
  2. WebClientPlugin 的Http请求调用, 以及异步回调接收响应, 以及将中断的插件链继续开发调用.
  3. WebClientResponsePlugin 对下游服务响应的封装.
相关标签: 网关 java