Dubbo源码阅读-DUBBO Adaptive
目录
1.概述
在 Dubbo 中,很多拓展都是通过 SPI 机制进行加载的,比如 Protocol、Cluster、LoadBalance 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾。拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo 通过自适应拓展机制很好的解决了。自适应拓展机制的实现逻辑比较复杂,首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类,整个过程比较复杂。
@Adaptive标注一般标注在接口的方法上,也可以标注在类,枚举类,方法上,但是比较少见,@Adaptive标注在接口方法上时,要求接口方法入参中有一个URL,这个URL类是org.apache.dubbo.common.URL,注解的功能是可以根据方法中URL的入参,来选择对哪一个实现进行调用,使用如下:
1.1 示例
定义接口:
@SPI("dubbo")
public interface RpcAdaptiveExt {
@Adaptive({"p"})
String echo(String msg, URL url);
}
定义实现类:
/**
* @ClassName : DubboAdaptiveExt
* @Description :
* @Author : xueguolin
* @Date: 2020-11-02 20:22
*/
public class DubboAdaptiveExt implements RpcAdaptiveExt {
@Override
public String echo(String msg, URL url) {
return "dubbo";
}
}
/**
* @ClassName : SpringCloudAdaptiveExt
* @Description : spring cloud
* @Author : xueguolin
* @Date: 2020-11-02 20:23
*/
public class SpringCloudAdaptiveExt implements RpcAdaptiveExt {
@Override
public String echo(String msg, URL url) {
return "spring cloud";
}
}
/**
* @ClassName : ThriftAdaptiveExt
* @Description : thrift
* @Author : xueguolin
* @Date: 2020-11-02 20:23
*/
//@Adaptive
public class ThriftAdaptiveExt implements RpcAdaptiveExt {
@Override
public String echo(String msg, URL url) {
return "thrift";
}
}
Dubbo Adaptive 所需的配置文件需放置在 META-INF/dubbo/internal 路径下com.guolin.test.adaptive.RpcAdaptiveExt,配置内容如下:
dubbo = com.guolin.test.adaptive.impl.DubboAdaptiveExt
cloud = com.guolin.test.adaptive.impl.SpringCloudAdaptiveExt
thrift = com.guolin.test.adaptive.impl.ThriftAdaptiveExt
场景测试:
/**
* AdaptiveExt SPI注解 value 为空,通过url的test.adaptive.ext参数传值
* url的key=rpc.adaptive.ext(类名单词小写拆分)
* 输出:rpc.adaptive.ext的值
*/
@Test
public void test1() {
ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
URL url = URL.valueOf("test://localhost/test?rpc.adaptive.ext=dubbo");
System.out.println(adaptiveExtension.echo("d", url));
}
/**
* SPI注解中有value值@SPI("dubbo")
* 输出:@SPI的值
*/
@Test
public void test2() {
ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
URL url = URL.valueOf("test://localhost/test");
System.out.println(adaptiveExtension.echo("d", url));
}
/**
* SPI注解中有value值@SPI("dubbo"),URL中也有具体的值
* 输出:@SPI的值
*/
@Test
public void test3() {
ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
URL url = URL.valueOf("test://localhost/test?rpc.adaptive.ext=cloud");
System.out.println(adaptiveExtension.echo("d", url));
}
/**
* SPI注解中有value值@SPI("dubbo"),URL中也有具体的值,实现类上面有
* 输出@Adaptive的值
*/
@Test
public void test4() {
ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
URL url = URL.valueOf("test://localhost/test?rpc.adaptive.ext=cloud");
System.out.println(adaptiveExtension.echo("d", url));
}
/**
* SPI注解中有value值@SPI("dubbo"),URL中也有具体的值,注解中的value与链接中的参数的key一致, 实现类上面没有@Adaptive
* 输出:参数的key的值
*/
@Test
public void test5() {
ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
URL url = URL.valueOf("test://localhost/test?p=cloud");
System.out.println(adaptiveExtension.echo("d", url));
}
2.源码
2.1 @Adaptive注解
在对自适应拓展生成过程进行深入分析之前,我们先来看一下与自适应拓展息息相关的一个注解,即 Adaptive 注解。
Adaptive 可注解在类或方法上。当 Adaptive 注解在类上时,Dubbo 不会为该类生成代理类。注解在方法(接口方法)上时,Dubbo 则会为该方法生成代理逻辑。Adaptive 注解在类上的情况很少,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是 AdaptiveCompiler 和 AdaptiveExtensionFactory。此种情况,表示拓展的加载逻辑由人工编码完成。更多时候,Adaptive 是注解在接口方法上的,表示拓展的加载逻辑需由框架自动生成。Adaptive 注解的地方不同,相应的处理逻辑也是不同的。
/**
* Provide helpful information for {@link ExtensionLoader} to inject dependency extension instance.
*
* @see ExtensionLoader
* @see URL
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
接下来我们看**解在接口方法上时的逻辑:
2.2 获取自适应拓展
ExtensionLoader#getAdaptiveExtension 方法是获取自适应拓展的入口方法:
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
// 从缓存中获取自适应拓展
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建自适应拓展
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
getAdaptiveExtension 方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension 方法创建自适应拓展。下面,我们看一下 createAdaptiveExtension 方法的代码。
createAdaptiveExtension 方法的代码比较少,主要包含了三个逻辑,分别如下:
- 调用 getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象
- 通过反射进行实例化
- 调用 injectExtension 方法向拓展实例中注入依赖
@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
try {
// 获取自适应拓展类,并通过反射实例化
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
getAdaptiveExtensionClass
getAdaptiveExtensionClass 方法包含了三个逻辑,如下:
- 调用 getExtensionClasses 获取所有的拓展类
- 检查缓存,若缓存不为空,则返回缓存
- 若缓存为空,则调用 createAdaptiveExtensionClass 创建自适应拓展类
private Class<?> getAdaptiveExtensionClass() {
// 通过 SPI 获取所有的拓展类
getExtensionClasses();
// 检查缓存,若缓存不为空,则直接返回缓存
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 创建自适应拓展类
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
上诉代码中主要的是第三步(createAdaptiveExtensionClass), createAdaptiveExtensionClass方法用于生成自适应拓展类,该方法首先会生成自适应拓展类的源码,然后通过 Compiler 实例(Dubbo 默认使用 javassist 作为编译器)编译源码,得到代理类 Class 实例。接下来,我们把重点放在代理类代码生成的逻辑上,其他逻辑大家自行分析。
private Class<?> createAdaptiveExtensionClass() {
// 拼接生成拓展类的代码字符串,用于下面编译器进行编译
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
2.3 自适应拓展类代码生成
/**
* generate and return class code
*/
public String generate() {
// no need to generate adaptive class since there's no adaptive method found.
if (!hasAdaptiveMethod()) {
throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
}
StringBuilder code = new StringBuilder();
// 生成包名 package com.xxx;
code.append(generatePackageInfo());
// 生成import org.apache.dubbo.common.extension.ExtensionLoader;
code.append(generateImports());
// 生成类名 public class RpcAdaptiveExt$Adaptive implements com.guolin.test.adaptive.RpcAdaptiveExt
code.append(generateClassDeclaration());
// 生成方法:方法返回值、方法名、方法参数、方法体、异常等
Method[] methods = type.getMethods();
for (Method method : methods) {
code.append(generateMethod(method));
}
code.append("}");
if (logger.isDebugEnabled()) {
logger.debug(code.toString());
}
return code.toString();
}
AdaptiveClassCodeGenerator 类核心的方法为 generate(),核心逻辑拆分为如下:
2.3.1 Adaptive 注解检测
new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate()方法首先会判断方法是否包含@Adaptive注解
/**
* test if given type has at least one method annotated with <code>Adaptive</code>
*/
private boolean hasAdaptiveMethod() {
return Arrays.stream(type.getMethods()).anyMatch(m -> m.isAnnotationPresent(Adaptive.class));
}
2.3.2 生成类
// 生成包名 package com.xxx;
/**
* generate package info
*/
private String generatePackageInfo() {
return String.format(CODE_PACKAGE, type.getPackage().getName());
}
// 生成import org.apache.dubbo.common.extension.ExtensionLoader;
/**
* generate imports
*/
private String generateImports() {
return String.format(CODE_IMPORTS, ExtensionLoader.class.getName());
}
// 生成类名 public class RpcAdaptiveExt$Adaptive implements com.guolin.test.adaptive.RpcAdaptiveExt
/**
* generate class declaration
*/
private String generateClassDeclaration() {
return String.format(CODE_CLASS_DECLARATION, type.getSimpleName(), type.getCanonicalName());
}
2.3.3 生成方法
// 生成方法:方法返回值、方法名、方法参数、方法体、异常等
Method[] methods = type.getMethods();
for (Method method : methods) {
code.append(generateMethod(method));
}
遍历所有方法,调用generateMethod(method)生成方法,generateMethod(method)逻辑如下:
/**
* generate method declaration
*/
private String generateMethod(Method method) {
// 方法返回类型:java.lang.String
String methodReturnType = method.getReturnType().getCanonicalName();
// 方法名称
String methodName = method.getName();
// 方法体
String methodContent = generateMethodContent(method);
// 方法参数,例如:java.lang.String arg0, org.apache.dubbo.common.URL arg1
String methodArgs = generateMethodArguments(method);
// 方法抛异常throws %s
String methodThrows = generateMethodThrows(method);
return String.format(CODE_METHOD_DECLARATION, methodReturnType, methodName, methodArgs, methodThrows, methodContent);
}
其中主要的逻辑在生成方法体, generateMethodContent(method),具体逻辑如下:
/**
* generate method content
*/
private String generateMethodContent(Method method) {
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
// 没有@Adaptive注解,抛异常
return generateUnsupported(method);
} else {
int urlTypeIndex = getUrlTypeIndex(method);
// 如果方法的URL参数不为空,做空指针判断
// found parameter in URL type
if (urlTypeIndex != -1) {
// Null Point check
// if (arg1 == null) {
// throw new IllegalArgumentException("url == null");
// }
code.append(generateUrlNullCheck(urlTypeIndex));
}
// 如果方法没有URL参数,并且找不到getUrl方法,则抛异常,如果有getUrl方法,则做空指针判断
else {
// did not find parameter in URL type
code.append(generateUrlAssignmentIndirectly(method));
}
// 获取@Adaptive注解的参数,例如:@Adaptive({"p"}),参数=p
String[] value = getMethodAdaptiveValue(adaptiveAnnotation);
// 判断是不是有org.apache.dubbo.rpc.Invocation参数
boolean hasInvocation = hasInvocationArgument(method);
// org.apache.dubbo.rpc.Invocation参数空判断
code.append(generateInvocationArgumentNullCheck(method));
// 拼接拓展,生成的代码功能等价于下面的代码 : String extName = url.getParameter(value[i], getNameCode)
code.append(generateExtNameAssignment(value, hasInvocation));
// check extName == null?
code.append(generateExtNameNullCheck(value));
// 获取拓展: com.xxx extension = (com.xxx) ExtensionLoader.getExtensionLoader(com.xxx.class).getExtension(extName);
code.append(generateExtensionAssignment());
// return statement ,返回并执行目标方法,例如:extension.echo(arg0, arg1);
code.append(generateReturnAndInvocation(method));
}
return code.toString();
}
其中拼接拓展代码段,逻辑如下(生成的代码功能等价于下面的代码 : String extName = url.getParameter(value[i], getNameCode) ):
/**
* generate extName assigment code
*/
private String generateExtNameAssignment(String[] value, boolean hasInvocation) {
// TODO: refactor it
String getNameCode = null;
for (int i = value.length - 1; i >= 0; --i) {
if (i == value.length - 1) {
if (null != defaultExtName) {
if (!"protocol".equals(value[i])) {
if (hasInvocation) {
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
} else {
// 生成的代码功能等价于下面的代码:
// url.getParameter(value[i], getNameCode)
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
}
} else {
// 生成的代码功能等价于下面的代码:
// url.getProtocol() == null ? getNameCode : url.getProtocol()
// 以 Protocol 接口的 connect 方法为例,最终生成的代码如下:
// url.getProtocol() == null ? "dubbo" : url.getProtocol()
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
}
} else {
if (!"protocol".equals(value[i])) {
if (hasInvocation) {
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
} else {
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
}
} else {
getNameCode = "url.getProtocol()";
}
}
} else {
if (!"protocol".equals(value[i])) {
if (hasInvocation) {
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
} else {
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
}
} else {
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
}
return String.format(CODE_EXT_NAME_ASSIGNMENT, getNameCode);
}
最后生成的代码如下:
ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class).getAdaptiveExtension() 自适应生成实现类如下:
package com.guolin.test.adaptive;
import org.apache.dubbo.common.extension.ExtensionLoader;
/**
* @ClassName : RpcAdaptiveExt$Adaptive
* @Description : @Adaptive生成的包装类
* @Author : xueguolin
* @Date: 2020-11-03 14:20
*/
public class RpcAdaptiveExt$Adaptive implements com.guolin.test.adaptive.RpcAdaptiveExt {
public java.lang.String echo(java.lang.String arg0,
org.apache.dubbo.common.URL arg1) {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}
org.apache.dubbo.common.URL url = arg1;
String extName = url.getParameter("p", "dubbo");
if (extName == null) {
throw new IllegalStateException(
"Failed to get extension (com.guolin.test.adaptive.RpcAdaptiveExt) name from url (" +
url.toString() + ") use keys([p])");
}
com.guolin.test.adaptive.RpcAdaptiveExt extension = (com.guolin.test.adaptive.RpcAdaptiveExt) ExtensionLoader.getExtensionLoader(com.guolin.test.adaptive.RpcAdaptiveExt.class)
.getExtension(extName);
return extension.echo(arg0, arg1);
}
}
2.4 编译
编译接口:
/**
* Compiler. (SPI, Singleton, ThreadSafe)
*/
@SPI("javassist")
public interface Compiler {
/**
* Compile java source code.
*
* @param code Java source code
* @param classLoader classloader
* @return Compiled class
*/
Class<?> compile(String code, ClassLoader classLoader);
}
抽象实现:
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
code = code.trim();
Matcher matcher = PACKAGE_PATTERN.matcher(code);
String pkg;
if (matcher.find()) {
pkg = matcher.group(1);
} else {
pkg = "";
}
matcher = CLASS_PATTERN.matcher(code);
String cls;
if (matcher.find()) {
cls = matcher.group(1);
} else {
throw new IllegalArgumentException("No such class name in " + code);
}
String className = pkg != null && pkg.length() > 0 ? pkg + "." + cls : cls;
try {
return Class.forName(className, true, org.apache.dubbo.common.utils.ClassUtils.getCallerClassLoader(getClass()));
} catch (ClassNotFoundException e) {
if (!code.endsWith("}")) {
throw new IllegalStateException("The java code not endsWith \"}\", code: \n" + code + "\n");
}
try {
return doCompile(className, code);
} catch (RuntimeException t) {
throw t;
} catch (Throwable t) {
throw new IllegalStateException("Failed to compile class, cause: " + t.getMessage() + ", class: " + className + ", code: \n" + code + "\n, stack: " + ClassUtils.toString(t));
}
}
}
protected abstract Class<?> doCompile(String name, String source) throws Throwable;
JavassistCompiler.java
/**
* JavassistCompiler. (SPI, Singleton, ThreadSafe)
*/
public class JavassistCompiler extends AbstractCompiler {
private static final Pattern IMPORT_PATTERN = Pattern.compile("import\\s+([\\w\\.\\*]+);\n");
private static final Pattern EXTENDS_PATTERN = Pattern.compile("\\s+extends\\s+([\\w\\.]+)[^\\{]*\\{\n");
private static final Pattern IMPLEMENTS_PATTERN = Pattern.compile("\\s+implements\\s+([\\w\\.]+)\\s*\\{\n");
private static final Pattern METHODS_PATTERN = Pattern.compile("\n(private|public|protected)\\s+");
private static final Pattern FIELD_PATTERN = Pattern.compile("[^\n]+=[^\n]+;");
@Override
public Class<?> doCompile(String name, String source) throws Throwable {
CtClassBuilder builder = new CtClassBuilder();
builder.setClassName(name);
// process imported classes
Matcher matcher = IMPORT_PATTERN.matcher(source);
while (matcher.find()) {
builder.addImports(matcher.group(1).trim());
}
// process extended super class
matcher = EXTENDS_PATTERN.matcher(source);
if (matcher.find()) {
builder.setSuperClassName(matcher.group(1).trim());
}
// process implemented interfaces
matcher = IMPLEMENTS_PATTERN.matcher(source);
if (matcher.find()) {
String[] ifaces = matcher.group(1).trim().split("\\,");
Arrays.stream(ifaces).forEach(i -> builder.addInterface(i.trim()));
}
// process constructors, fields, methods
String body = source.substring(source.indexOf('{') + 1, source.length() - 1);
String[] methods = METHODS_PATTERN.split(body);
String className = ClassUtils.getSimpleClassName(name);
Arrays.stream(methods).map(String::trim).filter(m -> !m.isEmpty()).forEach(method -> {
if (method.startsWith(className)) {
builder.addConstructor("public " + method);
} else if (FIELD_PATTERN.matcher(method).matches()) {
builder.addField("private " + method);
} else {
builder.addMethod("public " + method);
}
});
// compile
ClassLoader classLoader = org.apache.dubbo.common.utils.ClassUtils.getCallerClassLoader(getClass());
CtClass cls = builder.build(classLoader);
return cls.toClass(classLoader, JavassistCompiler.class.getProtectionDomain());
}
}
3.总结
关于自适应拓展的原理,总的来说自适应拓展整个逻辑主要在于:拼接代码生成代理类,最后编译。
4.参考文章
《dubbo spi :http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html》
5.代码地址
下一篇: 【Dubbo】Adaptive