欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Soul网关源码分析-6期

程序员文章站 2022-06-03 20:27:03
...


今日任务

  • AlibabaDubboPlugin 研究
  • ApacheDubboPlugin 研究



AlibabaDubboPlugin


将该启动的都开启, Mysql、Zookeeper、AlibabaDubbo服务、Soul-admin、Soul-bootstrap.

值得注意的是, 启动网关时打印了这几行日志:

2021-01-19 19:56:44.680  INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache     : init aliaba dubbo reference success there meteData is :MetaData(id=1350474912353136640, appName=dubbo, contextPath=null, path=/dubbo/insert, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboTestService, methodName=insert, parameterTypes=org.dromara.soul.test.dubbo.api.entity.DubboTest, rpcExt={"timeout":10000}, enabled=true)
2021-01-19 19:56:44.829  INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache     : init aliaba dubbo reference success there meteData is :MetaData(id=1350474914580312064, appName=dubbo, contextPath=null, path=/dubbo/findByIdsAndName, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboMultiParamService, methodName=findByIdsAndName, parameterTypes=java.util.List,java.lang.String, rpcExt={"timeout":10000}, enabled=true)

看着像服务信息缓存相关, 先记下 `ApplicationConfigCache` 这个类.

这次我们有了以前的经验, 直接找到 SoulWebHandlerexecute() 方法, 看看调用的插件链长啥样:

plugins = {[email protected]}  size = 12
 0 = {[email protected]} 
 1 = {[email protected]} 
 2 = {[email protected]} 
 3 = {[email protected]} 
 4 = {[email protected]} 
 5 = {[email protected]} 
 6 = {[email protected]} 
 7 = {[email protected]} 
 8 = {[email protected]} 
 9 = {[email protected]} 
 10 = {[email protected]} 
 11 = {[email protected]} 

比起简单的HTTP调用, 多了两个插件 BodyParamPluginAlibabaDubboPluginDubboResponsePlugin, 这两个应该就是 Dubbo 服务相关的插件了.

继续跑跑链调用, 像之前的老熟人 DividePluginWebClientPluginWebClientResponsePluginWebSocketPlugin 都直接跳过了, 能理解, 毕竟走的 RpcType 类型为 dubbo.


最后再关注下线程变动信息, 因为有服务调用肯定应该是要异步的, 这里不是在调用服务就是准备调用, 总之它不老实就是了:

Soul网关源码分析-6期

Soul网关源码分析-6期

可以从以下截图看到, 插件调用在 BodyParamPluginAlibabaDubboPlugin 间线程发生变动, 看来 BodyParamPlugin 是关键的请求转发插件, 我们从它来开刀, 跟踪它的代码.

PS: 这里注意下, 此处 BodyParamPlugin 完整路径为: org.dromara.soul.plugin.alibaba.dubbo.param


public class BodyParamPlugin implements SoulPlugin {
	@Override
  public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final ServerHttpRequest request = exchange.getRequest();
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    if (Objects.nonNull(soulContext) && RpcTypeEnum.DUBBO.getName().equals(soulContext.getRpcType())) {
      // 获取请求头的 contentType
      MediaType mediaType = request.getHeaders().getContentType();
      ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
      return serverRequest.bodyToMono(String.class)
        .switchIfEmpty(Mono.defer(() ->
                                  // 切换线程了
                                  Mono.just(""))
                      )
        .flatMap(body -> {
          // 判断 contentType 是否 json, 是则将 body 信息注入上下文, 用 key 标识是 dubbo参数
          if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
            exchange.getAttributes().put(Constants.DUBBO_PARAMS, body);
          }
          // 继续链调用
          return chain.execute(exchange);
        });
    }
    return chain.execute(exchange);
  }
}

没看到关键的调用服务的信息, 继续看下个插件 AlibabaPlugin (仅保留关键代码).

public class AlibabaDubboPlugin extends AbstractSoulPlugin {

  @Override
  protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    // 获取 BodyParamPlugin 注入的 body 参数
    String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
    // 获取元数据信息(路径、rpcType等)
    MetaData metaData = exchange.getAttribute(Constants.META_DATA);
    // Dubbo服务调用
    Object result = alibabaDubboProxyService.genericInvoker(body, metaData);
    // ..
    exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, result);
    return chain.execute(exchange);
  }
}

看来 alibabaDubboProxyService.genericInvoker() 这句就是调用关键了, 追踪被调用的 AlibabaDubboProxyService (仅保留核心代码):

public class AlibabaDubboProxyService {
  
  public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
    // 这里是关键, 通过服务名在缓存中获得服务信息
    ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getServiceName());
    GenericService genericService = reference.get();
    // ...
    return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
  }
}

方法的第一句就是关键, debug看看是什么信息:

<dubbo:reference protocol="dubbo" interface="org.dromara.soul.test.dubbo.api.service.DubboTestService" uniqueServiceName="org.dromara.soul.test.dubbo.api.service.DubboTestService" generic="true" generic="true" timeout="10000" id="org.dromara.soul.test.dubbo.api.service.DubboTestService" />

这里直接从网关缓存里, 获得了dubbo服务的接口相关配置, 那这个缓存的数据又是怎么来的呢? 还记得启动网关时, 打印的日志信息么, 代码里也能看到 ApplicationConfigCache 的存在. 我们通过那个日志, 追溯下这个类的具体生成缓存的方法:

public final class ApplicationConfigCache {
  
	public ReferenceConfig<GenericService> build(final MetaData metaData) {
    ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
    reference.setGeneric(true);
    reference.setApplication(applicationConfig);
    reference.setRegistry(registryConfig);
    reference.setInterface(metaData.getServiceName());
    reference.setProtocol("dubbo");
    // ...
    Object obj = reference.get();
    if (obj != null) {
      // 通过这行日志定位方法
      log.info("init aliaba dubbo reference success there meteData is :{}", metaData.toString());
      // 服务名称与信息放入缓存
      cache.put(metaData.getServiceName(), reference);
    }
    return reference;
  }
}

这里是加载缓存的地方, 那么这个方法是怎么被调用的? 追溯到 AlibabaDubboMetaDataSubscriber :

public class AlibabaDubboMetaDataSubscriber implements MetaDataSubscriber {
  private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();

  @Override
  public void onSubscribe(final MetaData metaData) {
    if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
      // ...
      // 这个方法最终会调到 build()
      ApplicationConfigCache.getInstance().initRef(metaData);
      META_DATA.put(metaData.getPath(), metaData);
    }
  }
}

其实看到它的接口类 MetaDataSubscriber 就能知道个大概了, 这又是个实现了元数据更新订阅的类, 会接收 soul-admin 管理后台推送的元数据信息, 筛选出 RpcType 为 Dubbo 类型的元数据进行缓存更新.

@RequiredArgsConstructor
public class MetaDataHandler extends AbstractDataHandler<MetaData> {
    
  private final List<MetaDataSubscriber> metaDataSubscribers;

  @Override
  protected void doRefresh(final List<MetaData> dataList) {
    metaDataSubscribers.forEach(MetaDataSubscriber::refresh);
    // 遍历所有元数据更新订阅类
    dataList.forEach(metaData -> metaDataSubscribers.forEach(metaDataSubscriber -> metaDataSubscriber.onSubscribe(metaData)));
  }

  // ...
}

最终的调用 dubbo 服务的代码这里再贴下, 都是 Alibaba Dubbo 框架的内容了, 不深入分析:

public class AlibabaDubboProxyService {
  
	public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
  // ...
  GenericService genericService = reference.get();
    if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {
      return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
    } else {
      Pair<String[], Object[]> pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
      return genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
    }
	}
}



ApacheDubboPlugin


开始分析 ApacheDubboPlugin 前, 先确认下 soul-bootstrap 网关的 pom.xml 文件, apache-dubbo相关的依赖是否启用, 尤其注意看 soul-spring-boot-starter-plugin-apache-dubbo 是否启用.

启动三个项目 soul-adminsoul-bootstrapsoul-test-apache-dubbo-service .


启动时报错 Duplicate key

打脸的是, 网关立马报了个错:

Caused by: java.lang.IllegalStateException: Duplicate key org.dromar[email protected]4e83a98
	...
	at org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber.<init>(CommonPluginDataSubscriber.java:46) ~[classes/:na]
	at org.dromara.soul.web.configuration.SoulConfiguration.pluginDataSubscriber(SoulConfiguration.java:97) ~[classes/:na]

不要紧张这些情况都是小case, 看到 CommonPluginDataSubscriber 这个类且看过我3期的分析文章 的同学, 应该会有种熟悉感… 不怕, 从头梳理下流程你就懂了.


首先 SoulConfiguration 配置类会加载一个叫做 pluginDataSubscriber 的 Bean, 用作插件数据的订阅, 当然订阅对象是后台管理系统:

public class SoulConfiguration {
	@Bean
  public PluginDataSubscriber pluginDataSubscriber(final ObjectProvider<List<PluginDataHandler>> pluginDataHandlerList) {
    return new CommonPluginDataSubscriber(pluginDataHandlerList.getIfAvailable(Collections::emptyList));
  }
}

这里的入参由来, 会借助 spring4.X 的机制获得所有父类为 PluginDataHandler 的实现类, 自然也会找到 CommonPluginDataSubscriber :

public class CommonPluginDataSubscriber implements PluginDataSubscriber {
	// ...
}

CommonPluginDataSubscriber 的构造器开始工作, 接收 pluginDataHandlerList 入参, 将这些 Bean 对象转换成一个缓存map:

private final Map<String, PluginDataHandler> handlerMap;

public CommonPluginDataSubscriber(final List<PluginDataHandler> pluginDataHandlerList) {
  // 异常问题就是此行
  this.handlerMap = pluginDataHandlerList.stream().collect(Collectors.toConcurrentMap(PluginDataHandler::pluginNamed, e -> e));
}

我们的异常就是在这里了, 构建hash时key重复了, 说明 pluginDataHandlerList 这个入参的 pluginName 属性有重复. 那么这个入参怎么来的呢? 自然是 spring注入的, 找到这些 Bean 即可.


答案就在我们网关中引入的 soul-spring-boot-starter-plugin-xx 项目中, 我现在不仅引入了 alibaba-dubbo , 也引入了 apache-dubbo , 这两个 PluginDataHandler 实现子类冲突了:

public class AlibabaDubboPluginDataHandler implements PluginDataHandler {

  @Override
  public String pluginNamed() {
    return PluginEnum.DUBBO.getName();
  }
}
public class ApacheDubboPluginDataHandler implements PluginDataHandler {
	@Override
  public String pluginNamed() {
    return PluginEnum.DUBBO.getName();
  }
}

怎么解决呢? 在网关中屏蔽调 soul-spring-boot-starter-plugin-alibaba-dubbo 这个依赖即可, 它的 AlibabaDubboPluginConfiguration 配置工厂不工作了自然不会有 AlibabaDubboPluginDataHandler 这个Bean了.

PS: 我挺想改了其中一个 pluginNamed() 返回值, 但鬼知道哪里会有这个枚举的使用, 不作死了.



正题

照旧找到 SoulWebHandlerexecute() 方法, 看看调用的插件链长啥样:

plugins = {[email protected]}  size = 11
 0 = {[email protected]} 
 1 = {[email protected]} 
 2 = {[email protected]} 
 3 = {[email protected]} 
 4 = {[email protected]} 
 5 = {[email protected]} 
 6 = {[email protected]} 
 7 = {[email protected]} 
 8 = {[email protected]} 
 9 = {[email protected]} 
 10 = {[email protected]} 

注意: 这里的 BodyParamPlugin 完整路径 org.dromara.soul.plugin.apache.dubbo.param


除了把 BodyParamPluginAlibabaDubboPlugin 换成 Apache包里的 BodyParamPluginApacheDubboPlugin , 就没啥区别了. 再具体看看 BodyParamPlugin 有什么不同:

public class BodyParamPlugin implements SoulPlugin {

  // .. 
  @Override
  public String named() {
    return "apache-dubbo-body-param";
  }
}

除了 named() 方法返回的字符串不一样, 没看出其他区别. 各种变量的类路径也都一样. 看看 ApacheDubboPlugin … 终于看出不一样了, 主要是调用的桩实现上不同 (仅保留核心代码):

public class ApacheDubboProxyService {

  public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
    // ...
    GenericService genericService = reference.get();
    Pair<String[], Object[]> pair = new ImmutablePair<>(new String[]{}, new Object[]{});
    // 使用 apache dubbo 的异步回调
    CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
    return Mono.fromFuture(future.thenApply(ret -> {
      if (Objects.nonNull(ret)) {
        exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
      } else {
        exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, Constants.DUBBO_RPC_RESULT_EMPTY);
      }
      exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
      return ret;
    }));
  }
}

ApacheDubboPlugin 对服务的调用是异步回调的模型 $invokeAsync(), 不用阻塞线程等待结果. 而 AlibabaDubboPlugin 在这块的调用模型则是同步的 $invoke() .

这块我其实挺有疑惑的, 没研究过 Apache-dubbo 和 Alibaba-dubbo, 不太清楚为什么一个支持$invokeAsync() 而另一个仅支持 $invoke() . 今后会了解一二再出个番外篇.

相关标签: 网关 java