欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Dubbo源码阅读-DUBBO Adaptive

程序员文章站 2022-06-01 19:51:59
...

目录

1.概述

1.1 示例

2.源码

2.1 @Adaptive注解

2.2 获取自适应拓展

2.3 自适应拓展类代码生成

2.3.1 Adaptive 注解检测

2.3.2 生成类

2.3.3 生成方法

2.4 编译

3.总结

4.参考文章

5.代码地址


1.概述

在 Dubbo 中,很多拓展都是通过 SPI 机制进行加载的,比如 Protocol、Cluster、LoadBalance 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾。拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo 通过自适应拓展机制很好的解决了。自适应拓展机制的实现逻辑比较复杂,首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类,整个过程比较复杂。

@Adaptive标注一般标注在接口的方法上,也可以标注在类,枚举类,方法上,但是比较少见,@Adaptive标注在接口方法上时,要求接口方法入参中有一个URL,这个URL类是org.apache.dubbo.common.URL注解的功能是可以根据方法中URL的入参,来选择对哪一个实现进行调用,使用如下:

1.1 示例

定义接口:


@SPI("dubbo")
public interface RpcAdaptiveExt {

    @Adaptive({"p"})
    String echo(String msg, URL url);
}

定义实现类:


/**
 * @ClassName : DubboAdaptiveExt
 * @Description :
 * @Author : xueguolin
 * @Date: 2020-11-02 20:22
 */
public class DubboAdaptiveExt implements RpcAdaptiveExt {
    @Override
    public String echo(String msg, URL url) {
        return "dubbo";
    }
}


/**
 * @ClassName : SpringCloudAdaptiveExt
 * @Description : spring cloud
 * @Author : xueguolin
 * @Date: 2020-11-02 20:23
 */
public class SpringCloudAdaptiveExt implements RpcAdaptiveExt {
    @Override
    public String echo(String msg, URL url) {
        return "spring cloud";
    }
}


/**
 * @ClassName : ThriftAdaptiveExt
 * @Description : thrift
 * @Author : xueguolin
 * @Date: 2020-11-02 20:23
 */
//@Adaptive
public class ThriftAdaptiveExt implements RpcAdaptiveExt {
    @Override
    public String echo(String msg, URL url) {
        return "thrift";
    }
}

Dubbo Adaptive 所需的配置文件需放置在 META-INF/dubbo/internal 路径下com.guolin.test.adaptive.RpcAdaptiveExt,配置内容如下:

dubbo = com.guolin.test.adaptive.impl.DubboAdaptiveExt
cloud = com.guolin.test.adaptive.impl.SpringCloudAdaptiveExt
thrift = com.guolin.test.adaptive.impl.ThriftAdaptiveExt

场景测试:

/**
     * AdaptiveExt SPI注解 value 为空,通过url的test.adaptive.ext参数传值
     * url的key=rpc.adaptive.ext(类名单词小写拆分)
     * 输出:rpc.adaptive.ext的值
     */
    @Test
    public void test1() {
        ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
        RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
        URL url = URL.valueOf("test://localhost/test?rpc.adaptive.ext=dubbo");
        System.out.println(adaptiveExtension.echo("d", url));
    }

    /**
     * SPI注解中有value值@SPI("dubbo")
     * 输出:@SPI的值
     */
    @Test
    public void test2() {
        ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
        RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
        URL url = URL.valueOf("test://localhost/test");
        System.out.println(adaptiveExtension.echo("d", url));
    }

    /**
     * SPI注解中有value值@SPI("dubbo"),URL中也有具体的值
     * 输出:@SPI的值
     */
    @Test
    public void test3() {
        ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
        RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
        URL url = URL.valueOf("test://localhost/test?rpc.adaptive.ext=cloud");
        System.out.println(adaptiveExtension.echo("d", url));
    }

    /**
     * SPI注解中有value值@SPI("dubbo"),URL中也有具体的值,实现类上面有
     * 输出@Adaptive的值
     */
    @Test
    public void test4() {
        ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
        RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
        URL url = URL.valueOf("test://localhost/test?rpc.adaptive.ext=cloud");
        System.out.println(adaptiveExtension.echo("d", url));
    }

    /**
     * SPI注解中有value值@SPI("dubbo"),URL中也有具体的值,注解中的value与链接中的参数的key一致, 实现类上面没有@Adaptive
     * 输出:参数的key的值
     */
    @Test
    public void test5() {
        ExtensionLoader<RpcAdaptiveExt> loader = ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class);
        RpcAdaptiveExt adaptiveExtension = loader.getAdaptiveExtension();
        URL url = URL.valueOf("test://localhost/test?p=cloud");
        System.out.println(adaptiveExtension.echo("d", url));
    }

 

2.源码

2.1 @Adaptive注解

在对自适应拓展生成过程进行深入分析之前,我们先来看一下与自适应拓展息息相关的一个注解,即 Adaptive 注解。

Adaptive 可注解在类或方法上。当 Adaptive 注解在类上时,Dubbo 不会为该类生成代理类。注解在方法(接口方法)上时,Dubbo 则会为该方法生成代理逻辑。Adaptive 注解在类上的情况很少,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是 AdaptiveCompiler 和 AdaptiveExtensionFactory。此种情况,表示拓展的加载逻辑由人工编码完成。更多时候,Adaptive 是注解在接口方法上的,表示拓展的加载逻辑需由框架自动生成。Adaptive 注解的地方不同,相应的处理逻辑也是不同的。


/**
 * Provide helpful information for {@link ExtensionLoader} to inject dependency extension instance.
 *
 * @see ExtensionLoader
 * @see URL
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
    String[] value() default {};

}

接下来我们看**解在接口方法上时的逻辑:

2.2 获取自适应拓展

ExtensionLoader#getAdaptiveExtension 方法是获取自适应拓展的入口方法:


    @SuppressWarnings("unchecked")
    public T getAdaptiveExtension() {
        // 从缓存中获取自适应拓展
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {
                throw new IllegalStateException("Failed to create adaptive instance: " +
                        createAdaptiveInstanceError.toString(),
                        createAdaptiveInstanceError);
            }

            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        // 创建自适应拓展
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }

        return (T) instance;
    }

getAdaptiveExtension 方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension 方法创建自适应拓展。下面,我们看一下 createAdaptiveExtension 方法的代码。

createAdaptiveExtension 方法的代码比较少,主要包含了三个逻辑,分别如下:

  1. 调用 getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象
  2. 通过反射进行实例化
  3. 调用 injectExtension 方法向拓展实例中注入依赖

    @SuppressWarnings("unchecked")
    private T createAdaptiveExtension() {
        try {
            // 获取自适应拓展类,并通过反射实例化
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }

getAdaptiveExtensionClass 

getAdaptiveExtensionClass 方法包含了三个逻辑,如下:

  1. 调用 getExtensionClasses 获取所有的拓展类
  2. 检查缓存,若缓存不为空,则返回缓存
  3. 若缓存为空,则调用 createAdaptiveExtensionClass 创建自适应拓展类

    private Class<?> getAdaptiveExtensionClass() {
// 通过 SPI 获取所有的拓展类
        getExtensionClasses();
// 检查缓存,若缓存不为空,则直接返回缓存
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
// 创建自适应拓展类
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

上诉代码中主要的是第三步(createAdaptiveExtensionClass), createAdaptiveExtensionClass方法用于生成自适应拓展类,该方法首先会生成自适应拓展类的源码,然后通过 Compiler 实例(Dubbo 默认使用 javassist 作为编译器)编译源码,得到代理类 Class 实例。接下来,我们把重点放在代理类代码生成的逻辑上,其他逻辑大家自行分析。


    private Class<?> createAdaptiveExtensionClass() {
        // 拼接生成拓展类的代码字符串,用于下面编译器进行编译
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

 

2.3 自适应拓展类代码生成


    /**
     * generate and return class code
     */
    public String generate() {
        // no need to generate adaptive class since there's no adaptive method found.
        if (!hasAdaptiveMethod()) {
            throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
        }

        StringBuilder code = new StringBuilder();
        // 生成包名 package com.xxx;
        code.append(generatePackageInfo());
        // 生成import org.apache.dubbo.common.extension.ExtensionLoader;
        code.append(generateImports());
        // 生成类名 public class RpcAdaptiveExt$Adaptive implements com.guolin.test.adaptive.RpcAdaptiveExt
        code.append(generateClassDeclaration());
        // 生成方法:方法返回值、方法名、方法参数、方法体、异常等
        Method[] methods = type.getMethods();
        for (Method method : methods) {
            code.append(generateMethod(method));
        }
        code.append("}");

        if (logger.isDebugEnabled()) {
            logger.debug(code.toString());
        }
        return code.toString();
    }

AdaptiveClassCodeGenerator 类核心的方法为 generate(),核心逻辑拆分为如下:

2.3.1 Adaptive 注解检测

new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate()方法首先会判断方法是否包含@Adaptive注解


    /**
     * test if given type has at least one method annotated with <code>Adaptive</code>
     */
    private boolean hasAdaptiveMethod() {
        return Arrays.stream(type.getMethods()).anyMatch(m -> m.isAnnotationPresent(Adaptive.class));
    }

2.3.2 生成类

 // 生成包名 package com.xxx;
    /**
     * generate package info
     */
    private String generatePackageInfo() {
        return String.format(CODE_PACKAGE, type.getPackage().getName());
    }

// 生成import org.apache.dubbo.common.extension.ExtensionLoader;
    /**
     * generate imports
     */
    private String generateImports() {
        return String.format(CODE_IMPORTS, ExtensionLoader.class.getName());
    }


// 生成类名 public class RpcAdaptiveExt$Adaptive implements com.guolin.test.adaptive.RpcAdaptiveExt
    /**
     * generate class declaration
     */
    private String generateClassDeclaration() {
        return String.format(CODE_CLASS_DECLARATION, type.getSimpleName(), type.getCanonicalName());
    }




2.3.3 生成方法


        // 生成方法:方法返回值、方法名、方法参数、方法体、异常等
        Method[] methods = type.getMethods();
        for (Method method : methods) {
            code.append(generateMethod(method));
        }

遍历所有方法,调用generateMethod(method)生成方法,generateMethod(method)逻辑如下:


    /**
     * generate method declaration
     */
    private String generateMethod(Method method) {
        // 方法返回类型:java.lang.String
        String methodReturnType = method.getReturnType().getCanonicalName();
        // 方法名称
        String methodName = method.getName();
        // 方法体
        String methodContent = generateMethodContent(method);
        // 方法参数,例如:java.lang.String arg0, org.apache.dubbo.common.URL arg1
        String methodArgs = generateMethodArguments(method);
        // 方法抛异常throws %s
        String methodThrows = generateMethodThrows(method);
        return String.format(CODE_METHOD_DECLARATION, methodReturnType, methodName, methodArgs, methodThrows, methodContent);
    }

其中主要的逻辑在生成方法体, generateMethodContent(method),具体逻辑如下:


    /**
     * generate method content
     */
    private String generateMethodContent(Method method) {
        Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
        StringBuilder code = new StringBuilder(512);
        if (adaptiveAnnotation == null) {
            // 没有@Adaptive注解,抛异常
            return generateUnsupported(method);
        } else {
            int urlTypeIndex = getUrlTypeIndex(method);
            // 如果方法的URL参数不为空,做空指针判断
            // found parameter in URL type
            if (urlTypeIndex != -1) {
                // Null Point check
                //   if (arg1 == null) {
                //            throw new IllegalArgumentException("url == null");
                //        }
                code.append(generateUrlNullCheck(urlTypeIndex));
            }
            // 如果方法没有URL参数,并且找不到getUrl方法,则抛异常,如果有getUrl方法,则做空指针判断
            else {
                // did not find parameter in URL type
                code.append(generateUrlAssignmentIndirectly(method));
            }
            // 获取@Adaptive注解的参数,例如:@Adaptive({"p"}),参数=p
            String[] value = getMethodAdaptiveValue(adaptiveAnnotation);
            // 判断是不是有org.apache.dubbo.rpc.Invocation参数
            boolean hasInvocation = hasInvocationArgument(method);
            // org.apache.dubbo.rpc.Invocation参数空判断
            code.append(generateInvocationArgumentNullCheck(method));
            // 拼接拓展,生成的代码功能等价于下面的代码 : String extName = url.getParameter(value[i], getNameCode)
            code.append(generateExtNameAssignment(value, hasInvocation));
            // check extName == null?
            code.append(generateExtNameNullCheck(value));
            // 获取拓展: com.xxx extension = (com.xxx) ExtensionLoader.getExtensionLoader(com.xxx.class).getExtension(extName);
            code.append(generateExtensionAssignment());

            // return statement ,返回并执行目标方法,例如:extension.echo(arg0, arg1);
            code.append(generateReturnAndInvocation(method));
        }

        return code.toString();
    }

其中拼接拓展代码段,逻辑如下(生成的代码功能等价于下面的代码 : String extName = url.getParameter(value[i], getNameCode) ):


    /**
     * generate extName assigment code
     */
    private String generateExtNameAssignment(String[] value, boolean hasInvocation) {
        // TODO: refactor it
        String getNameCode = null;
        for (int i = value.length - 1; i >= 0; --i) {
            if (i == value.length - 1) {
                if (null != defaultExtName) {
                    if (!"protocol".equals(value[i])) {
                        if (hasInvocation) {
                            getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                        } else {
                            // 生成的代码功能等价于下面的代码:
                            //   url.getParameter(value[i], getNameCode)
                            getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
                        }
                    } else {
                        // 生成的代码功能等价于下面的代码:
                        //   url.getProtocol() == null ? getNameCode : url.getProtocol()
                        // 以 Protocol 接口的 connect 方法为例,最终生成的代码如下:
                        //   url.getProtocol() == null ? "dubbo" : url.getProtocol()
                        getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
                    }
                } else {
                    if (!"protocol".equals(value[i])) {
                        if (hasInvocation) {
                            getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                        } else {
                            getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
                        }
                    } else {
                        getNameCode = "url.getProtocol()";
                    }
                }
            } else {
                if (!"protocol".equals(value[i])) {
                    if (hasInvocation) {
                        getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                    } else {
                        getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
                    }
                } else {
                    getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
                }
            }
        }

        return String.format(CODE_EXT_NAME_ASSIGNMENT, getNameCode);
    }

最后生成的代码如下:

ExtensionLoader.getExtensionLoader(RpcAdaptiveExt.class).getAdaptiveExtension() 自适应生成实现类如下:

package com.guolin.test.adaptive;
        
import org.apache.dubbo.common.extension.ExtensionLoader;


/**
 * @ClassName : RpcAdaptiveExt$Adaptive
 * @Description : @Adaptive生成的包装类
 * @Author : xueguolin
 * @Date: 2020-11-03 14:20
 */
public class RpcAdaptiveExt$Adaptive implements com.guolin.test.adaptive.RpcAdaptiveExt {
    public java.lang.String echo(java.lang.String arg0,
                                 org.apache.dubbo.common.URL arg1) {
        if (arg1 == null) {
            throw new IllegalArgumentException("url == null");
        }

        org.apache.dubbo.common.URL url = arg1;
        String extName = url.getParameter("p", "dubbo");

        if (extName == null) {
            throw new IllegalStateException(
                    "Failed to get extension (com.guolin.test.adaptive.RpcAdaptiveExt) name from url (" +
                            url.toString() + ") use keys([p])");
        }

        com.guolin.test.adaptive.RpcAdaptiveExt extension = (com.guolin.test.adaptive.RpcAdaptiveExt) ExtensionLoader.getExtensionLoader(com.guolin.test.adaptive.RpcAdaptiveExt.class)
                .getExtension(extName);

        return extension.echo(arg0, arg1);
    }
}

2.4 编译

编译接口:


/**
 * Compiler. (SPI, Singleton, ThreadSafe)
 */
@SPI("javassist")
public interface Compiler {

    /**
     * Compile java source code.
     *
     * @param code        Java source code
     * @param classLoader classloader
     * @return Compiled class
     */
    Class<?> compile(String code, ClassLoader classLoader);

}

抽象实现:


    @Override
    public Class<?> compile(String code, ClassLoader classLoader) {
        code = code.trim();
        Matcher matcher = PACKAGE_PATTERN.matcher(code);
        String pkg;
        if (matcher.find()) {
            pkg = matcher.group(1);
        } else {
            pkg = "";
        }
        matcher = CLASS_PATTERN.matcher(code);
        String cls;
        if (matcher.find()) {
            cls = matcher.group(1);
        } else {
            throw new IllegalArgumentException("No such class name in " + code);
        }
        String className = pkg != null && pkg.length() > 0 ? pkg + "." + cls : cls;
        try {
            return Class.forName(className, true, org.apache.dubbo.common.utils.ClassUtils.getCallerClassLoader(getClass()));
        } catch (ClassNotFoundException e) {
            if (!code.endsWith("}")) {
                throw new IllegalStateException("The java code not endsWith \"}\", code: \n" + code + "\n");
            }
            try {
                return doCompile(className, code);
            } catch (RuntimeException t) {
                throw t;
            } catch (Throwable t) {
                throw new IllegalStateException("Failed to compile class, cause: " + t.getMessage() + ", class: " + className + ", code: \n" + code + "\n, stack: " + ClassUtils.toString(t));
            }
        }
    }

    protected abstract Class<?> doCompile(String name, String source) throws Throwable;

JavassistCompiler.java


/**
 * JavassistCompiler. (SPI, Singleton, ThreadSafe)
 */
public class JavassistCompiler extends AbstractCompiler {

    private static final Pattern IMPORT_PATTERN = Pattern.compile("import\\s+([\\w\\.\\*]+);\n");

    private static final Pattern EXTENDS_PATTERN = Pattern.compile("\\s+extends\\s+([\\w\\.]+)[^\\{]*\\{\n");

    private static final Pattern IMPLEMENTS_PATTERN = Pattern.compile("\\s+implements\\s+([\\w\\.]+)\\s*\\{\n");

    private static final Pattern METHODS_PATTERN = Pattern.compile("\n(private|public|protected)\\s+");

    private static final Pattern FIELD_PATTERN = Pattern.compile("[^\n]+=[^\n]+;");

    @Override
    public Class<?> doCompile(String name, String source) throws Throwable {
        CtClassBuilder builder = new CtClassBuilder();
        builder.setClassName(name);

        // process imported classes
        Matcher matcher = IMPORT_PATTERN.matcher(source);
        while (matcher.find()) {
            builder.addImports(matcher.group(1).trim());
        }

        // process extended super class
        matcher = EXTENDS_PATTERN.matcher(source);
        if (matcher.find()) {
            builder.setSuperClassName(matcher.group(1).trim());
        }

        // process implemented interfaces
        matcher = IMPLEMENTS_PATTERN.matcher(source);
        if (matcher.find()) {
            String[] ifaces = matcher.group(1).trim().split("\\,");
            Arrays.stream(ifaces).forEach(i -> builder.addInterface(i.trim()));
        }

        // process constructors, fields, methods
        String body = source.substring(source.indexOf('{') + 1, source.length() - 1);
        String[] methods = METHODS_PATTERN.split(body);
        String className = ClassUtils.getSimpleClassName(name);
        Arrays.stream(methods).map(String::trim).filter(m -> !m.isEmpty()).forEach(method -> {
            if (method.startsWith(className)) {
                builder.addConstructor("public " + method);
            } else if (FIELD_PATTERN.matcher(method).matches()) {
                builder.addField("private " + method);
            } else {
                builder.addMethod("public " + method);
            }
        });

        // compile
        ClassLoader classLoader = org.apache.dubbo.common.utils.ClassUtils.getCallerClassLoader(getClass());
        CtClass cls = builder.build(classLoader);
        return cls.toClass(classLoader, JavassistCompiler.class.getProtectionDomain());
    }

}

3.总结

关于自适应拓展的原理,总的来说自适应拓展整个逻辑主要在于:拼接代码生成代理类,最后编译。

4.参考文章

《dubbo spi :http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html

5.代码地址

https://gitee.com/shandadadada/spi_demo

相关标签: dubbo