欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Dubbo源码学习

程序员文章站 2024-01-27 12:04:10
...

Dubbo源码学习

一、概述

Dubbo是一款高性能、轻量级基于Java的RPC开源框架。平时使用的非常多。但仅仅使用很难了解背后的原理,更不用提经常出现在面试中,这篇文章主要从源码的角度解析 dubbo 比较重要的三个模块:服务导出、服务引入、负载均衡。还有很多其他重要模块如:服务路由、消费者集群、通信协议、服务治理等等,由于篇幅原因将在以后的其他博客分享。

1.1 主要组件

Dubbo 在使用上有5个主要的组成部分:
Dubbo源码学习

  • Provider: 服务提供方
  • Consumer: 服务调用方
  • Registry: 服务注册与发现的注册中心
  • Monitor: 统计服务的调用次数和调用时间的监控中心
  • Container: 服务运行容器
1.2 项目整体设计

Dubbo源码学习

  • 左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类
  • 蓝色虚线为初始化过程,即启动时组装链
  • 红色实线为方法调用过程,即运行时调时链
  • 紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法
  • 主要的层级和说明
    • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
    • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
    • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
    • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
    • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
    • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
    • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
1.3 源码模块

源码地址:https://github.com/apache/dubbo.git
主要模块与说明:

  1. dubbo-registry——注册中心模块:各种注册中心(Multicast、Zookeeper、Redis、Simple)的实现和注册中心下发地址的实现。
  2. dubbo-cluster——集群模块:集群的抽象,以及 集群配置、负载均衡、容错、路由 的实现,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
  3. dubbo-common——公共逻辑模块:各种工具类和通用的模型。serialize 层放在 common 模块中,以便更大程度复用。
  4. dubbo-config——配置模块:Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节,Dubbo的4种配置方式:XML配置、属性配置、API配置、注解配置
  5. dubbo-rpc——远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。protocol 层和 proxy 层都放在 rpc 模块中,这两层是 rpc 的核心,在不需要集群也就是只有一个提供者时,可以只使用这两层完成 rpc 调用。
  6. dubbo-remoting——远程通信模块:Dubbo 协议的实现,提供了多种客户端和服务端通信功能,比如基于Grizzly、Netty、Tomcat等等,RPC用除了RMI的协议都要用到此模块。transport 层和 exchange 层都放在 remoting 模块中,为 rpc 调用的通讯基础
  7. dubbo-container——容器模块:一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为后台服务并不需要Tomcat/JBoss 等Web 容器的特性,没必要用 Web 容器去加载服务
  8. dubbo-monitor——监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。

其他模块与说明

  1. dubbo-bootstrap——清理模块:这个模块只有一个类,是作为dubbo的引导类,并且在停止期间进行清理资源。
  2. dubbo-demo——示例模块:这个模块是快速启动示例,其中包含了服务提供方和调用方,注册中心用的是multicast,用XML配置方法。
  3. dubbo-filter——过滤器模块:一些过滤器。
  4. dubbo-plugin——插件模块。
  5. dubbo-serialization——序列化模块:封装了各类序列化框架的支持实现。
  6. dubbo-test——测试模块:封装了针对dubbo的性能测试、兼容性测试等功能。

二、拓展的基石 - dubbo SPI

不同于java的SPI实现,dubbo 的 SPI 要求:

  1. 配置文件需放置在 META-INF/dubbo 路径下
  2. 格式是 name = 类的全路径限定名 格式,换行符分割,这样可以实现按需加载制定的类,如:
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee
  1. 被拓展的接口上需要 @SPI 注解
  2. 拓展类上可以有 @Adaptive 注解,如果有这个注解,dubbo会对拓展类实行“自适应拓展加载”(即:不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。其实现原理是:首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类)

Dubbo SPI 机制的入口是 dubbo-common 包的 org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader(String name),其接收一个"name",根据这个name 在 META-INF/dubbo 下对应接口名字的文件中加载指定的实现类。getExtensionLoader()的逻辑很简单,就是从本地缓存查待加载类的实例,存在则返回,不存在则调用 createExtension(String name) 创建,所以这个是其核心代码:

private T createExtension(String name) {
   // 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
   Class<?> clazz = getExtensionClasses().get(name);
   if (clazz == null) {
       throw findException(name);
   }
   try {
       T instance = (T) EXTENSION_INSTANCES.get(clazz);
       if (instance == null) {
           // 通过反射创建实例
           EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
           instance = (T) EXTENSION_INSTANCES.get(clazz);
       }
       // 向实例中注入依赖
       injectExtension(instance);
       Set<Class<?>> wrapperClasses = cachedWrapperClasses;
       if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
           // 循环创建 Wrapper 实例
           for (Class<?> wrapperClass : wrapperClasses) {
               // 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
               // 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
               instance = injectExtension(
                   (T) wrapperClass.getConstructor(type).newInstance(instance));
           }
       }
       return instance;
   } catch (Throwable t) {
       throw new IllegalStateException("...");
   }
}

通过代码,我们发现步骤如下:

  1. 通过 getExtensionClasses 获取所有的拓展类,getExtensionClasses会依次调用 loadExtensionClasses(), 调用 loadDirectory() 加载指定文件夹配置文件,并最终调用 loadResource() 加载资源
  2. 通过反射创建拓展对象
  3. 向拓展对象中注入依赖,Dubbo IOC 具体实现
  4. 将拓展对象包裹在相应的 Wrapper 对象中,AOP 具体实现

Dubbo IOC 的实现如下:

private T injectExtension(T instance) {
   try {
       if (objectFactory != null) {
           // 遍历目标类的所有方法
           for (Method method : instance.getClass().getMethods()) {
               // 检测方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public
               if (method.getName().startsWith("set")
                   && method.getParameterTypes().length == 1
                   && Modifier.isPublic(method.getModifiers())) {
                   // 获取 setter 方法参数类型
                   Class<?> pt = method.getParameterTypes()[0];
                   try {
                       // 获取属性名,比如 setName 方法对应属性名 name
                       String property = method.getName().length() > 3 ? 
                           method.getName().substring(3, 4).toLowerCase() + 
                           	method.getName().substring(4) : "";
                       // 从 ObjectFactory 中获取依赖对象
                       Object object = objectFactory.getExtension(pt, property);
                       if (object != null) {
                           // 通过反射调用 setter 方法设置依赖
                           method.invoke(instance, object);
                       }
                   } catch (Exception e) {
                       logger.error("fail to inject via method...");
                   }
               }
           }
       }
   } catch (Exception e) {
       logger.error(e.getMessage(), e);
   }
   return instance;
}

Dubbo IOC 是通过 setter 方法注入依赖。Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有 setter 方法特征。若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。

三、服务导出

Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分,第一部分是前置工作,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。
服务导出的入口在onApplicationEvent:

public void onApplicationEvent(ContextRefreshedEvent event) {
    // 是否有延迟导出 && 是否已导出 && 是不是已被取消导出
    if (isDelay() && !isExported() && !isUnexported()) {
        // 导出服务
        export();
    }
}

export()的逻辑比较简单就不贴代码了,主要根据状态判断立即导出还是延迟导出,在 export() 中的 doExport() 会做如下的校验:

  1. 检测 <dubbo:service> 标签的 interface 属性合法性,不合法则抛出异常
  2. 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
  3. 检测并处理泛化服务和普通服务类
  4. 检测本地存根配置,并进行相应的处理
  5. 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常
    校验完成后会执行真正的导出方法:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    
    // 省略无关代码
    
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(Constants.SCOPE_KEY);
    // 如果 scope = none,则什么都不做
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        // scope != remote,导出到本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }

        // scope != local,导出到远程
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                    // 加载监视器链接
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        // 将监视器链接作为参数添加到 url 中
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }

                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }

                    // 为服务提供类(ref)生成 Invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 导出服务,并生成 Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                
            // 不存在注册中心,仅导出服务
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

由于分支太多,每个分支的含义整理如下:

// 获取 ArgumentConfig 列表
for (遍历 ArgumentConfig 列表) {
    if (type 不为 null,也不为空串) {    // 分支1
        1. 通过反射获取 interfaceClass 的方法列表
        for (遍历方法列表) {
            1. 比对方法名,查找目标方法
        	2. 通过反射获取目标方法的参数类型数组 argtypes
            if (index != -1) {    // 分支2
                1. 从 argtypes 数组中获取下标 index 处的元素 argType
                2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
                3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常
            } else {    // 分支3
                1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
                2. 添加 ArgumentConfig 字段信息到 map 中
            }
        }
    } else if (index != -1) {    // 分支4
		1. 添加 ArgumentConfig 字段信息到 map 中
    }
}

服务导出后会进行服务注册,具体的注册逻辑在刚才代码 export() 中:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    
    // ${导出服务}
    
    // 省略其他代码
    
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // 注册服务
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
    
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 订阅 override 数据
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    // 省略部分代码
}

其中 register() 中 getRegistry() 会创建并返回注册中心,创建注册中心会调用 createRegistry(),createRegistry()是一个拓展点,取决于SPI加载的子类,以 zookeeper 注册中心为例:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    
    // 获取组名,默认为 dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        // group = "/" + group
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
    zkClient = zookeeperTransporter.connect(url);
    // 添加状态监听器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

ZookeeperTransporter 的 connect() 创建Zookeeper 客户端,创建好后意味着注册中心就创建结束了。这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter,以CuratorZookeeperTransporter 为例:

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {

   private final CuratorFramework client;
   
   public CuratorZookeeperClient(URL url) {
       super(url);
       try {
           // 创建 CuratorFramework 构造器
           CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                   .connectString(url.getBackupAddress())
                   .retryPolicy(new RetryNTimes(1, 1000))
                   .connectionTimeoutMs(5000);
           String authority = url.getAuthority();
           if (authority != null && authority.length() > 0) {
               builder = builder.authorization("digest", authority.getBytes());
           }
           // 构建 CuratorFramework 实例
           client = builder.build();
           // 添加监听器
           client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
               @Override
               public void stateChanged(CuratorFramework client, ConnectionState state) {
                   if (state == ConnectionState.LOST) {
                       CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                   } else if (state == ConnectionState.CONNECTED) {
                       CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                   } else if (state == ConnectionState.RECONNECTED) {
                       CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                   }
               }
           });
           
           // 启动客户端
           client.start();
       } catch (Exception e) {
           throw new IllegalStateException(e.getMessage(), e);
       }
   }
}

就是创建 CuratorZookeeperClient 并启动 CuratorFramework 实例,可参考 Curator 官方文档

四、服务调用

Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 <dubbo:reference> 的 init 属性开启
服务调用有两种方式:直连和基于注册中心进行引用。此处以注册中心引用为例。
当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject()中的 init(),并由该方法执行服务引用逻辑。得到 Invoker 实例,Invoker具备调用服务的功能但不能直接暴露给用户,会造成业务代码的入侵,所以还需要通过代理工厂类 ProxyFactory 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。
init() 的方法比较长也不是很重要主要做下面的事情:

  1. 检测 ConsumerConfig 实例是否存在,如不存在则创建一个新的实例,然后通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段
  2. 检测泛化配置,并根据配置设置 interfaceClass 的值
  3. 从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用
  4. 收集各种配置,并将配置存储到 map 中
  5. 处理 MethodConfig 实例。该实例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。
  6. 解析服务消费者 ip,以及调用 createProxy 创建代理对象

createProxy() 看名字以为是创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例,真正重要的是其中的 refer() 方法:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // 创建 DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

其中,getClients() 用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。

完成Invoker 创建后,接下来就是为服务接口生成代理对象,有了代理对象后就可以进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy:

public static Proxy getProxy(Class<?>... ics) {
    // 调用重载方法
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    // 遍历接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 检测类型是否为接口
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");

        Class<?> tmp = null;
        try {
            // 重新加载接口类
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }

        // 检测接口是否相同,这里 tmp 有可能为空
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

        // 拼接接口全限定名,分隔符为 ;
        sb.append(itf).append(';');
    }

    // 使用拼接后的接口名作为 key
    String key = sb.toString();

    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            ProxyCacheMap.put(cl, cache);
        }
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 从缓存中获取 Reference<Proxy> 实例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }

            // 并发控制,保证只有一个线程可以进行后续操作
            if (value == PendingGenerationMarker) {
                try {
                    // 其他线程在此处进行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置标志位到缓存中,并跳出 while 循环进行后续操作
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 创建 ClassGenerator 对象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();

        for (int i = 0; i < ics.length; i++) {
            // 检测接口访问级别是否为 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 获取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        // 非 public 级别的接口必须在同一个包下,否者抛出异常
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                }
            }
            
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);

            // 遍历接口方法
            for (Method method : ics[i].getMethods()) {
                // 获取方法描述,可理解为方法签名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
                // A 接口和 B 接口中包含一个完全相同的方法
                if (worked.contains(desc))
                    continue;
                worked.add(desc);

                int ix = methods.size();
                // 获取方法返回值类型
                Class<?> rt = method.getReturnType();
                // 获取参数列表
                Class<?>[] pts = method.getParameterTypes();

                // 生成 Object[] args = new Object[1...N]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                for (int j = 0; j < pts.length; j++)
                    // 生成 args[1...N] = ($w)$1...N;
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
                // Object ret = handler.invoke(this, methods[1...N], args);
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");

                // 返回值不为 void
                if (!Void.TYPE.equals(rt))
                    // 生成返回语句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");

                methods.add(method);
                // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中 
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
            }
        }

        if (pkg == null)
            pkg = PACKAGE_NAME;

        // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");

        // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        //     handler=$1;
    	// }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
        // 为接口代理类添加默认构造方法
        ccp.addDefaultConstructor();
        
        // 生成接口代理类
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
        String fcn = Proxy.class.getName() + id;
        ccm = ClassGenerator.newInstance(cl);
        ccm.setClassName(fcn);
        ccm.addDefaultConstructor();
        ccm.setSuperClass(Proxy.class);
        // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
        // public Object newInstance(java.lang.reflect.InvocationHandler h) { 
        //     return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
        // 生成 Proxy 实现类
        Class<?> pc = ccm.toClass();
        // 通过反射创建 Proxy 实例
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        if (ccp != null)
            // 释放资源
            ccp.release();
        if (ccm != null)
            ccm.release();
        synchronized (cache) {
            if (proxy == null)
                cache.remove(key);
            else
                // 写缓存
                cache.put(key, new WeakReference<Proxy>(proxy));
            // 唤醒其他等待线程
            cache.notifyAll();
        }
    }
    return proxy;
}

比较复杂,需要根据注释仔细研读。以上就是服务的调用重要源码。

五、负载均衡

Dubbo 提供了4种负载均衡实现,分别是:

  1. 基于权重随机算法的 RandomLoadBalance
  2. 基于最少活跃调用数算法的 LeastActiveLoadBalance
  3. 基于 hash 一致性的 ConsistentHashLoadBalance
  4. 基于加权轮询算法的 RoundRobinLoadBalance

Dubbo 中,所有负载均衡实现类均继承自 AbstractLoadBalance,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。select() 是负载均衡的入口方法。select() 中逻辑很简单主要是调用 SPI 实现类的 soSelect() 方法完成负载均衡,下面依次介绍四种负载均衡算法的实现:

5.1 基于权重随机算法的 RandomLoadBalance
public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        int totalWeight = 0;
        boolean sameWeight = true;
        // 下面这个循环有两个作用,第一是计算总权重 totalWeight,
        // 第二是检测每个服务提供者的权重是否相同
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // 累加权重
            totalWeight += weight;
            // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同,
            // 不相同的话,则将 sameWeight 置为 false。
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }
        
        // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
        if (totalWeight > 0 && !sameWeight) {
            // 随机获取一个 [0, totalWeight) 区间内的数字
            int offset = random.nextInt(totalWeight);
            // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。
            // 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。
            // 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
            // 表明其不会落在服务器 A 对应的区间上。
            // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
            // 表明其会落在服务器 B 对应的区间上
            for (int i = 0; i < length; i++) {
                // 让随机值 offset 减去权重值
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    // 返回相应的 Invoker
                    return invokers.get(i);
                }
            }
        }
        
        // 如果所有服务提供者权重值相同,此时直接随机返回一个即可
        return invokers.get(random.nextInt(length));
    }
}

它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。

5.2 基于最少活跃调用数算法的 LeastActiveLoadBalance
public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        // 最小的活跃数
        int leastActive = -1;
        // 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
        int leastCount = 0; 
        // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
        int[] leastIndexs = new int[length];
        int totalWeight = 0;
        // 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
        // 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
        int firstWeight = 0;
        boolean sameWeight = true;

        // 遍历 invokers 列表
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 获取 Invoker 对应的活跃数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // 获取权重 - ⭐️
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
            // 发现更小的活跃数,重新开始
            if (leastActive == -1 || active < leastActive) {
            	// 使用当前活跃数 active 更新最小活跃数 leastActive
                leastActive = active;
                // 更新 leastCount 为 1
                leastCount = 1;
                // 记录当前下标值到 leastIndexs 中
                leastIndexs[0] = i;
                totalWeight = weight;
                firstWeight = weight;
                sameWeight = true;

            // 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同 
            } else if (active == leastActive) {
            	// 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
                leastIndexs[leastCount++] = i;
                // 累加权重
                totalWeight += weight;
                // 检测当前 Invoker 的权重与 firstWeight 是否相等,
                // 不相等则将 sameWeight 置为 false
                if (sameWeight && i > 0
                    && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        
        // 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
        if (leastCount == 1) {
            return invokers.get(leastIndexs[0]);
        }

        // 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
        if (!sameWeight && totalWeight > 0) {
        	// 随机生成一个 [0, totalWeight) 之间的数字
            int offsetWeight = random.nextInt(totalWeight);
            // 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
            // 当 offset 小于等于0时,返回相应的 Invoker
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                // 获取权重值,并让随机数减去权重值 - ⭐️
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 如果权重相同或权重为0时,随机返回一个 Invoker
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }
}

代码过程如下:

  1. 遍历 invokers 列表,寻找活跃数最小的 Invoker
  2. 如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker
  3. 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
  4. 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
  5. 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
  6. 如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可
    从代码可以看出来,除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。
5.3 基于 hash 一致性的 ConsistentHashLoadBalance

一致性hash算法原理是:首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。
dubbo 对其的实现类 ConsistentHashLoadBalance的 doSelect()方法只做了一些前置工作,真正实现在 ConsistentHashSelector 的 select方法中,其中 virtualInvokers 是一个 treeMap,记录了各个节点的虚拟节点的 hash -> Invoker,初始化工作在 ConsistentHashSelector 的构造方法中完成,由于篇幅原因没有贴出来,下面的 select() 算法的实际步骤:

public Invoker<T> select(Invocation invocation) {
    // 将参数转为 key
    String key = toKey(invocation.getArguments());
    // 对参数 key 进行 md5 运算
    byte[] digest = md5(key);
    // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
    // 寻找合适的 Invoker
    return selectForKey(hash(digest, 0));
}

private Invoker<T> selectForKey(long hash) {
    // 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
    Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
    // 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,
    // 需要将 TreeMap 的头节点赋值给 entry
    if (entry == null) {
        entry = virtualInvokers.firstEntry();
    }

    // 返回 Invoker
    return entry.getValue();
}
5.4 基于加权轮询算法的 RoundRobinLoadBalance
public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size();
        int maxWeight = 0;
        int minWeight = Integer.MAX_VALUE;
        final List<Invoker<T>> invokerToWeightList = new ArrayList<>();
        
        // 查找最大和最小权重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight);
            minWeight = Math.min(minWeight, weight);
            if (weight > 0) {
                invokerToWeightList.add(invokers.get(i));
            }
        }
        
        // 获取当前服务对应的调用序列对象 AtomicPositiveInteger
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            // 创建 AtomicPositiveInteger,默认值为0
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        
        // 获取下标序列对象 AtomicPositiveInteger
        AtomicPositiveInteger indexSeq = indexSeqs.get(key);
        if (indexSeq == null) {
            // 创建 AtomicPositiveInteger,默认值为 -1
            indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
            indexSeq = indexSeqs.get(key);
        }

        if (maxWeight > 0 && minWeight < maxWeight) {
            length = invokerToWeightList.size();
            while (true) {
                int index = indexSeq.incrementAndGet() % length;
                int currentWeight = sequence.get() % maxWeight;

                // 每循环一轮(index = 0),重新计算 currentWeight
                if (index == 0) {
                    currentWeight = sequence.incrementAndGet() % maxWeight;
                }
                
                // 检测 Invoker 的权重是否大于 currentWeight,大于则返回
                if (getWeight(invokerToWeightList.get(index), invocation) > currentWeight) {
                    return invokerToWeightList.get(index);
                }
            }
        }
        
        // 所有 Invoker 权重相等,此时进行普通的轮询即可
        return invokers.get(sequence.incrementAndGet() % length);
    }
}

代码逻辑是这样的:每进行一轮循环,重新计算 currentWeight。如果当前 Invoker 权重大于 currentWeight,则返回该 Invoker。下面举例说明,假设服务器 [A, B, C] 对应权重 [5, 2, 1]。

  1. 第一轮循环,currentWeight = 1,可返回 A 和 B
  2. 第二轮循环,currentWeight = 2,返回 A
  3. 第三轮循环,currentWeight = 3,返回 A
  4. 第四轮循环,currentWeight = 4,返回 A
  5. 第五轮循环,currentWeight = 0,返回 A, B, C
    如上,这里的一轮循环是指 index 再次变为0所经历过的循环,这里可以把 index = 0 看做是一轮循环的开始。每一轮循环的次数与 Invoker 的数量有关,Invoker 数量通常不会太多,所以我们可以认为上面代码的时间复杂度为常数级。

以上就是 dubbo 四种负载均衡算法的实现

六、总结

Dubbo 是目前最流行的 rpc 框架之一,也是面试过程中经常遇到的面试题,了解其中的原理有利于更好的使用,更能化解面试中的尴尬,还能丰富自身的编程经验和架构经验。dubbo 还更多的源码值得深入分析,但服务提供和服务调用是其中比较重要的组成部分,其他还包括:通信协议、服务治理、服务路由、高可用集群搭建模式等等。将在以后逐步解析。

相关标签: 应用架构