Soul网关源码分析-6期
今日任务
- AlibabaDubboPlugin 研究
- ApacheDubboPlugin 研究
AlibabaDubboPlugin
将该启动的都开启, Mysql、Zookeeper、AlibabaDubbo服务、Soul-admin、Soul-bootstrap.
值得注意的是, 启动网关时打印了这几行日志:
2021-01-19 19:56:44.680 INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache : init aliaba dubbo reference success there meteData is :MetaData(id=1350474912353136640, appName=dubbo, contextPath=null, path=/dubbo/insert, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboTestService, methodName=insert, parameterTypes=org.dromara.soul.test.dubbo.api.entity.DubboTest, rpcExt={"timeout":10000}, enabled=true)
2021-01-19 19:56:44.829 INFO 1623 --- [ctReadThread-37] o.d.s.p.a.d.c.ApplicationConfigCache : init aliaba dubbo reference success there meteData is :MetaData(id=1350474914580312064, appName=dubbo, contextPath=null, path=/dubbo/findByIdsAndName, rpcType=dubbo, serviceName=org.dromara.soul.test.dubbo.api.service.DubboMultiParamService, methodName=findByIdsAndName, parameterTypes=java.util.List,java.lang.String, rpcExt={"timeout":10000}, enabled=true)
看着像服务信息缓存相关, 先记下 `ApplicationConfigCache` 这个类.
这次我们有了以前的经验, 直接找到 SoulWebHandler
的 execute()
方法, 看看调用的插件链长啥样:
plugins = {[email protected]} size = 12
0 = {[email protected]}
1 = {[email protected]}
2 = {[email protected]}
3 = {[email protected]}
4 = {[email protected]}
5 = {[email protected]}
6 = {[email protected]}
7 = {[email protected]}
8 = {[email protected]}
9 = {[email protected]}
10 = {[email protected]}
11 = {[email protected]}
比起简单的HTTP调用, 多了两个插件 BodyParamPlugin
、AlibabaDubboPlugin
、DubboResponsePlugin
, 这两个应该就是 Dubbo 服务相关的插件了.
继续跑跑链调用, 像之前的老熟人 DividePlugin
、WebClientPlugin
、WebClientResponsePlugin
和 WebSocketPlugin
都直接跳过了, 能理解, 毕竟走的 RpcType 类型为 dubbo.
最后再关注下线程变动信息, 因为有服务调用肯定应该是要异步的, 这里不是在调用服务就是准备调用, 总之它不老实就是了:
可以从以下截图看到, 插件调用在 BodyParamPlugin
到 AlibabaDubboPlugin
间线程发生变动, 看来 BodyParamPlugin
是关键的请求转发插件, 我们从它来开刀, 跟踪它的代码.
PS: 这里注意下, 此处 BodyParamPlugin
完整路径为: org.dromara.soul.plugin.alibaba.dubbo.param
public class BodyParamPlugin implements SoulPlugin {
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(soulContext) && RpcTypeEnum.DUBBO.getName().equals(soulContext.getRpcType())) {
// 获取请求头的 contentType
MediaType mediaType = request.getHeaders().getContentType();
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
return serverRequest.bodyToMono(String.class)
.switchIfEmpty(Mono.defer(() ->
// 切换线程了
Mono.just(""))
)
.flatMap(body -> {
// 判断 contentType 是否 json, 是则将 body 信息注入上下文, 用 key 标识是 dubbo参数
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
exchange.getAttributes().put(Constants.DUBBO_PARAMS, body);
}
// 继续链调用
return chain.execute(exchange);
});
}
return chain.execute(exchange);
}
}
没看到关键的调用服务的信息, 继续看下个插件 AlibabaPlugin
(仅保留关键代码).
public class AlibabaDubboPlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取 BodyParamPlugin 注入的 body 参数
String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
// 获取元数据信息(路径、rpcType等)
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
// Dubbo服务调用
Object result = alibabaDubboProxyService.genericInvoker(body, metaData);
// ..
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, result);
return chain.execute(exchange);
}
}
看来 alibabaDubboProxyService.genericInvoker()
这句就是调用关键了, 追踪被调用的 AlibabaDubboProxyService
(仅保留核心代码):
public class AlibabaDubboProxyService {
public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
// 这里是关键, 通过服务名在缓存中获得服务信息
ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getServiceName());
GenericService genericService = reference.get();
// ...
return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
}
}
方法的第一句就是关键, debug看看是什么信息:
<dubbo:reference protocol="dubbo" interface="org.dromara.soul.test.dubbo.api.service.DubboTestService" uniqueServiceName="org.dromara.soul.test.dubbo.api.service.DubboTestService" generic="true" generic="true" timeout="10000" id="org.dromara.soul.test.dubbo.api.service.DubboTestService" />
这里直接从网关缓存里, 获得了dubbo服务的接口相关配置, 那这个缓存的数据又是怎么来的呢? 还记得启动网关时, 打印的日志信息么, 代码里也能看到 ApplicationConfigCache
的存在. 我们通过那个日志, 追溯下这个类的具体生成缓存的方法:
public final class ApplicationConfigCache {
public ReferenceConfig<GenericService> build(final MetaData metaData) {
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
// ...
Object obj = reference.get();
if (obj != null) {
// 通过这行日志定位方法
log.info("init aliaba dubbo reference success there meteData is :{}", metaData.toString());
// 服务名称与信息放入缓存
cache.put(metaData.getServiceName(), reference);
}
return reference;
}
}
这里是加载缓存的地方, 那么这个方法是怎么被调用的? 追溯到 AlibabaDubboMetaDataSubscriber
:
public class AlibabaDubboMetaDataSubscriber implements MetaDataSubscriber {
private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
@Override
public void onSubscribe(final MetaData metaData) {
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
// ...
// 这个方法最终会调到 build()
ApplicationConfigCache.getInstance().initRef(metaData);
META_DATA.put(metaData.getPath(), metaData);
}
}
}
其实看到它的接口类 MetaDataSubscriber
就能知道个大概了, 这又是个实现了元数据更新订阅的类, 会接收 soul-admin
管理后台推送的元数据信息, 筛选出 RpcType 为 Dubbo 类型的元数据进行缓存更新.
@RequiredArgsConstructor
public class MetaDataHandler extends AbstractDataHandler<MetaData> {
private final List<MetaDataSubscriber> metaDataSubscribers;
@Override
protected void doRefresh(final List<MetaData> dataList) {
metaDataSubscribers.forEach(MetaDataSubscriber::refresh);
// 遍历所有元数据更新订阅类
dataList.forEach(metaData -> metaDataSubscribers.forEach(metaDataSubscriber -> metaDataSubscriber.onSubscribe(metaData)));
}
// ...
}
最终的调用 dubbo 服务的代码这里再贴下, 都是 Alibaba Dubbo 框架的内容了, 不深入分析:
public class AlibabaDubboProxyService {
public Object genericInvoker(final String body, final MetaData metaData) throws SoulException {
// ...
GenericService genericService = reference.get();
if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {
return genericService.$invoke(metaData.getMethodName(), new String[]{}, new Object[]{});
} else {
Pair<String[], Object[]> pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
return genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
}
}
}
ApacheDubboPlugin
开始分析 ApacheDubboPlugin 前, 先确认下 soul-bootstrap
网关的 pom.xml
文件, apache-dubbo相关的依赖是否启用, 尤其注意看 soul-spring-boot-starter-plugin-apache-dubbo
是否启用.
启动三个项目 soul-admin
、soul-bootstrap
、soul-test-apache-dubbo-service
.
启动时报错 Duplicate key
打脸的是, 网关立马报了个错:
Caused by: java.lang.IllegalStateException: Duplicate key org.dromar[email protected]4e83a98
...
at org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber.<init>(CommonPluginDataSubscriber.java:46) ~[classes/:na]
at org.dromara.soul.web.configuration.SoulConfiguration.pluginDataSubscriber(SoulConfiguration.java:97) ~[classes/:na]
不要紧张这些情况都是小case, 看到 CommonPluginDataSubscriber
这个类且看过我3期的分析文章 的同学, 应该会有种熟悉感… 不怕, 从头梳理下流程你就懂了.
首先 SoulConfiguration
配置类会加载一个叫做 pluginDataSubscriber
的 Bean, 用作插件数据的订阅, 当然订阅对象是后台管理系统:
public class SoulConfiguration {
@Bean
public PluginDataSubscriber pluginDataSubscriber(final ObjectProvider<List<PluginDataHandler>> pluginDataHandlerList) {
return new CommonPluginDataSubscriber(pluginDataHandlerList.getIfAvailable(Collections::emptyList));
}
}
这里的入参由来, 会借助 spring4.X 的机制获得所有父类为 PluginDataHandler
的实现类, 自然也会找到 CommonPluginDataSubscriber
:
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
// ...
}
CommonPluginDataSubscriber
的构造器开始工作, 接收 pluginDataHandlerList
入参, 将这些 Bean 对象转换成一个缓存map:
private final Map<String, PluginDataHandler> handlerMap;
public CommonPluginDataSubscriber(final List<PluginDataHandler> pluginDataHandlerList) {
// 异常问题就是此行
this.handlerMap = pluginDataHandlerList.stream().collect(Collectors.toConcurrentMap(PluginDataHandler::pluginNamed, e -> e));
}
我们的异常就是在这里了, 构建hash时key重复了, 说明 pluginDataHandlerList
这个入参的 pluginName
属性有重复. 那么这个入参怎么来的呢? 自然是 spring注入的, 找到这些 Bean 即可.
答案就在我们网关中引入的 soul-spring-boot-starter-plugin-xx
项目中, 我现在不仅引入了 alibaba-dubbo
, 也引入了 apache-dubbo
, 这两个 PluginDataHandler
实现子类冲突了:
public class AlibabaDubboPluginDataHandler implements PluginDataHandler {
@Override
public String pluginNamed() {
return PluginEnum.DUBBO.getName();
}
}
public class ApacheDubboPluginDataHandler implements PluginDataHandler {
@Override
public String pluginNamed() {
return PluginEnum.DUBBO.getName();
}
}
怎么解决呢? 在网关中屏蔽调 soul-spring-boot-starter-plugin-alibaba-dubbo
这个依赖即可, 它的 AlibabaDubboPluginConfiguration
配置工厂不工作了自然不会有 AlibabaDubboPluginDataHandler
这个Bean了.
PS: 我挺想改了其中一个 pluginNamed()
返回值, 但鬼知道哪里会有这个枚举的使用, 不作死了.
正题
照旧找到 SoulWebHandler
的 execute()
方法, 看看调用的插件链长啥样:
plugins = {[email protected]} size = 11
0 = {[email protected]}
1 = {[email protected]}
2 = {[email protected]}
3 = {[email protected]}
4 = {[email protected]}
5 = {[email protected]}
6 = {[email protected]}
7 = {[email protected]}
8 = {[email protected]}
9 = {[email protected]}
10 = {[email protected]}
注意: 这里的 BodyParamPlugin
完整路径 org.dromara.soul.plugin.apache.dubbo.param
除了把 BodyParamPlugin
、AlibabaDubboPlugin
换成 Apache包里的 BodyParamPlugin
、ApacheDubboPlugin
, 就没啥区别了. 再具体看看 BodyParamPlugin
有什么不同:
public class BodyParamPlugin implements SoulPlugin {
// ..
@Override
public String named() {
return "apache-dubbo-body-param";
}
}
除了 named()
方法返回的字符串不一样, 没看出其他区别. 各种变量的类路径也都一样. 看看 ApacheDubboPlugin … 终于看出不一样了, 主要是调用的桩实现上不同 (仅保留核心代码):
public class ApacheDubboProxyService {
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
// ...
GenericService genericService = reference.get();
Pair<String[], Object[]> pair = new ImmutablePair<>(new String[]{}, new Object[]{});
// 使用 apache dubbo 的异步回调
CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.nonNull(ret)) {
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
} else {
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, Constants.DUBBO_RPC_RESULT_EMPTY);
}
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
}));
}
}
ApacheDubboPlugin 对服务的调用是异步回调的模型 $invokeAsync()
, 不用阻塞线程等待结果. 而 AlibabaDubboPlugin 在这块的调用模型则是同步的 $invoke()
.
这块我其实挺有疑惑的, 没研究过 Apache-dubbo 和 Alibaba-dubbo, 不太清楚为什么一个支持$invokeAsync()
而另一个仅支持 $invoke()
. 今后会了解一二再出个番外篇.
上一篇: 揭秘:武则天真的害死亲姐姐了吗?