Dubbo源码学习
Dubbo源码学习
一、概述
Dubbo是一款高性能、轻量级基于Java的RPC开源框架。平时使用的非常多。但仅仅使用很难了解背后的原理,更不用提经常出现在面试中,这篇文章主要从源码的角度解析 dubbo 比较重要的三个模块:服务导出、服务引入、负载均衡。还有很多其他重要模块如:服务路由、消费者集群、通信协议、服务治理等等,由于篇幅原因将在以后的其他博客分享。
1.1 主要组件
Dubbo 在使用上有5个主要的组成部分:
- Provider: 服务提供方
- Consumer: 服务调用方
- Registry: 服务注册与发现的注册中心
- Monitor: 统计服务的调用次数和调用时间的监控中心
- Container: 服务运行容器
1.2 项目整体设计
- 左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口
- 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI
- 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类
- 蓝色虚线为初始化过程,即启动时组装链
- 红色实线为方法调用过程,即运行时调时链
- 紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法
- 主要的层级和说明
- config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
- proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
- registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
- protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
- transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
- serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
1.3 源码模块
源码地址:https://github.com/apache/dubbo.git
主要模块与说明:
- dubbo-registry——注册中心模块:各种注册中心(Multicast、Zookeeper、Redis、Simple)的实现和注册中心下发地址的实现。
- dubbo-cluster——集群模块:集群的抽象,以及 集群配置、负载均衡、容错、路由 的实现,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
- dubbo-common——公共逻辑模块:各种工具类和通用的模型。serialize 层放在 common 模块中,以便更大程度复用。
- dubbo-config——配置模块:Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节,Dubbo的4种配置方式:XML配置、属性配置、API配置、注解配置
- dubbo-rpc——远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。protocol 层和 proxy 层都放在 rpc 模块中,这两层是 rpc 的核心,在不需要集群也就是只有一个提供者时,可以只使用这两层完成 rpc 调用。
- dubbo-remoting——远程通信模块:Dubbo 协议的实现,提供了多种客户端和服务端通信功能,比如基于Grizzly、Netty、Tomcat等等,RPC用除了RMI的协议都要用到此模块。transport 层和 exchange 层都放在 remoting 模块中,为 rpc 调用的通讯基础
- dubbo-container——容器模块:一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为后台服务并不需要Tomcat/JBoss 等Web 容器的特性,没必要用 Web 容器去加载服务
- dubbo-monitor——监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。
其他模块与说明
- dubbo-bootstrap——清理模块:这个模块只有一个类,是作为dubbo的引导类,并且在停止期间进行清理资源。
- dubbo-demo——示例模块:这个模块是快速启动示例,其中包含了服务提供方和调用方,注册中心用的是multicast,用XML配置方法。
- dubbo-filter——过滤器模块:一些过滤器。
- dubbo-plugin——插件模块。
- dubbo-serialization——序列化模块:封装了各类序列化框架的支持实现。
- dubbo-test——测试模块:封装了针对dubbo的性能测试、兼容性测试等功能。
二、拓展的基石 - dubbo SPI
不同于java的SPI实现,dubbo 的 SPI 要求:
- 配置文件需放置在 META-INF/dubbo 路径下
- 格式是
name = 类的全路径限定名
格式,换行符分割,这样可以实现按需加载制定的类,如:
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee
- 被拓展的接口上需要 @SPI 注解
- 拓展类上可以有 @Adaptive 注解,如果有这个注解,dubbo会对拓展类实行“自适应拓展加载”(即:不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。其实现原理是:首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类)
Dubbo SPI 机制的入口是 dubbo-common 包的 org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader(String name),其接收一个"name",根据这个name 在 META-INF/dubbo 下对应接口名字的文件中加载指定的实现类。getExtensionLoader()的逻辑很简单,就是从本地缓存查待加载类的实例,存在则返回,不存在则调用 createExtension(String name) 创建,所以这个是其核心代码:
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension(
(T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("...");
}
}
通过代码,我们发现步骤如下:
- 通过 getExtensionClasses 获取所有的拓展类,getExtensionClasses会依次调用 loadExtensionClasses(), 调用 loadDirectory() 加载指定文件夹配置文件,并最终调用 loadResource() 加载资源
- 通过反射创建拓展对象
- 向拓展对象中注入依赖,Dubbo IOC 具体实现
- 将拓展对象包裹在相应的 Wrapper 对象中,AOP 具体实现
Dubbo IOC 的实现如下:
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
// 遍历目标类的所有方法
for (Method method : instance.getClass().getMethods()) {
// 检测方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
// 获取 setter 方法参数类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获取属性名,比如 setName 方法对应属性名 name
String property = method.getName().length() > 3 ?
method.getName().substring(3, 4).toLowerCase() +
method.getName().substring(4) : "";
// 从 ObjectFactory 中获取依赖对象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 通过反射调用 setter 方法设置依赖
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method...");
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
Dubbo IOC 是通过 setter 方法注入依赖。Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有 setter 方法特征。若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。
三、服务导出
Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分,第一部分是前置工作,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。
服务导出的入口在onApplicationEvent:
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否有延迟导出 && 是否已导出 && 是不是已被取消导出
if (isDelay() && !isExported() && !isUnexported()) {
// 导出服务
export();
}
}
export()的逻辑比较简单就不贴代码了,主要根据状态判断立即导出还是延迟导出,在 export() 中的 doExport() 会做如下的校验:
- 检测 <dubbo:service> 标签的 interface 属性合法性,不合法则抛出异常
- 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
- 检测并处理泛化服务和普通服务类
- 检测本地存根配置,并进行相应的处理
- 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常
校验完成后会执行真正的导出方法:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略无关代码
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果 scope = none,则什么都不做
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// scope != remote,导出到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope != local,导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器链接作为参数添加到 url 中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注册中心,仅导出服务
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
由于分支太多,每个分支的含义整理如下:
// 获取 ArgumentConfig 列表
for (遍历 ArgumentConfig 列表) {
if (type 不为 null,也不为空串) { // 分支1
1. 通过反射获取 interfaceClass 的方法列表
for (遍历方法列表) {
1. 比对方法名,查找目标方法
2. 通过反射获取目标方法的参数类型数组 argtypes
if (index != -1) { // 分支2
1. 从 argtypes 数组中获取下标 index 处的元素 argType
2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常
} else { // 分支3
1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
2. 添加 ArgumentConfig 字段信息到 map 中
}
}
} else if (index != -1) { // 分支4
1. 添加 ArgumentConfig 字段信息到 map 中
}
}
服务导出后会进行服务注册,具体的注册逻辑在刚才代码 export() 中:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// ${导出服务}
// 省略其他代码
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 省略部分代码
}
其中 register() 中 getRegistry() 会创建并返回注册中心,创建注册中心会调用 createRegistry(),createRegistry()是一个拓展点,取决于SPI加载的子类,以 zookeeper 注册中心为例:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
ZookeeperTransporter 的 connect() 创建Zookeeper 客户端,创建好后意味着注册中心就创建结束了。这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter,以CuratorZookeeperTransporter 为例:
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
// 添加监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
就是创建 CuratorZookeeperClient 并启动 CuratorFramework 实例,可参考 Curator 官方文档
四、服务调用
Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 <dubbo:reference> 的 init 属性开启
服务调用有两种方式:直连和基于注册中心进行引用。此处以注册中心引用为例。
当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject()中的 init(),并由该方法执行服务引用逻辑。得到 Invoker 实例,Invoker具备调用服务的功能但不能直接暴露给用户,会造成业务代码的入侵,所以还需要通过代理工厂类 ProxyFactory 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。
init() 的方法比较长也不是很重要主要做下面的事情:
- 检测 ConsumerConfig 实例是否存在,如不存在则创建一个新的实例,然后通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段
- 检测泛化配置,并根据配置设置 interfaceClass 的值
- 从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用
- 收集各种配置,并将配置存储到 map 中
- 处理 MethodConfig 实例。该实例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。
- 解析服务消费者 ip,以及调用 createProxy 创建代理对象
createProxy() 看名字以为是创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例,真正重要的是其中的 refer() 方法:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 创建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
其中,getClients() 用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。
完成Invoker 创建后,接下来就是为服务接口生成代理对象,有了代理对象后就可以进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy:
public static Proxy getProxy(Class<?>... ics) {
// 调用重载方法
return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");
StringBuilder sb = new StringBuilder();
// 遍历接口列表
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
// 检测类型是否为接口
if (!ics[i].isInterface())
throw new RuntimeException(itf + " is not a interface.");
Class<?> tmp = null;
try {
// 重新加载接口类
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
// 检测接口是否相同,这里 tmp 有可能为空
if (tmp != ics[i])
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
// 拼接接口全限定名,分隔符为 ;
sb.append(itf).append(';');
}
// 使用拼接后的接口名作为 key
String key = sb.toString();
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}
Proxy proxy = null;
synchronized (cache) {
do {
// 从缓存中获取 Reference<Proxy> 实例
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null) {
return proxy;
}
}
// 并发控制,保证只有一个线程可以进行后续操作
if (value == PendingGenerationMarker) {
try {
// 其他线程在此处进行等待
cache.wait();
} catch (InterruptedException e) {
}
} else {
// 放置标志位到缓存中,并跳出 while 循环进行后续操作
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
// 创建 ClassGenerator 对象
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
for (int i = 0; i < ics.length; i++) {
// 检测接口访问级别是否为 protected 或 privete
if (!Modifier.isPublic(ics[i].getModifiers())) {
// 获取接口包名
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
// 非 public 级别的接口必须在同一个包下,否者抛出异常
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
// 添加接口到 ClassGenerator 中
ccp.addInterface(ics[i]);
// 遍历接口方法
for (Method method : ics[i].getMethods()) {
// 获取方法描述,可理解为方法签名
String desc = ReflectUtils.getDesc(method);
// 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
// A 接口和 B 接口中包含一个完全相同的方法
if (worked.contains(desc))
continue;
worked.add(desc);
int ix = methods.size();
// 获取方法返回值类型
Class<?> rt = method.getReturnType();
// 获取参数列表
Class<?>[] pts = method.getParameterTypes();
// 生成 Object[] args = new Object[1...N]
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
// 生成 args[1...N] = ($w)$1...N;
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
// 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
// Object ret = handler.invoke(this, methods[1...N], args);
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
// 返回值不为 void
if (!Void.TYPE.equals(rt))
// 生成返回语句,形如 return (java.lang.String) ret;
code.append(" return ").append(asArgument(rt, "ret")).append(";");
methods.add(method);
// 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}
if (pkg == null)
pkg = PACKAGE_NAME;
// 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
// 生成 private java.lang.reflect.InvocationHandler handler;
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
// 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
// porxy0(java.lang.reflect.InvocationHandler arg0) {
// handler=$1;
// }
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
// 为接口代理类添加默认构造方法
ccp.addDefaultConstructor();
// 生成接口代理类
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
// 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
// public Object newInstance(java.lang.reflect.InvocationHandler h) {
// return new org.apache.dubbo.proxy0($1);
// }
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
// 生成 Proxy 实现类
Class<?> pc = ccm.toClass();
// 通过反射创建 Proxy 实例
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
if (ccp != null)
// 释放资源
ccp.release();
if (ccm != null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
// 写缓存
cache.put(key, new WeakReference<Proxy>(proxy));
// 唤醒其他等待线程
cache.notifyAll();
}
}
return proxy;
}
比较复杂,需要根据注释仔细研读。以上就是服务的调用重要源码。
五、负载均衡
Dubbo 提供了4种负载均衡实现,分别是:
- 基于权重随机算法的 RandomLoadBalance
- 基于最少活跃调用数算法的 LeastActiveLoadBalance
- 基于 hash 一致性的 ConsistentHashLoadBalance
- 基于加权轮询算法的 RoundRobinLoadBalance
Dubbo 中,所有负载均衡实现类均继承自 AbstractLoadBalance,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。select() 是负载均衡的入口方法。select() 中逻辑很简单主要是调用 SPI 实现类的 soSelect() 方法完成负载均衡,下面依次介绍四种负载均衡算法的实现:
5.1 基于权重随机算法的 RandomLoadBalance
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int totalWeight = 0;
boolean sameWeight = true;
// 下面这个循环有两个作用,第一是计算总权重 totalWeight,
// 第二是检测每个服务提供者的权重是否相同
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// 累加权重
totalWeight += weight;
// 检测当前服务提供者的权重与上一个服务提供者的权重是否相同,
// 不相同的话,则将 sameWeight 置为 false。
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
if (totalWeight > 0 && !sameWeight) {
// 随机获取一个 [0, totalWeight) 区间内的数字
int offset = random.nextInt(totalWeight);
// 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。
// 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。
// 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
// 表明其不会落在服务器 A 对应的区间上。
// 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
// 表明其会落在服务器 B 对应的区间上
for (int i = 0; i < length; i++) {
// 让随机值 offset 减去权重值
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
// 返回相应的 Invoker
return invokers.get(i);
}
}
}
// 如果所有服务提供者权重值相同,此时直接随机返回一个即可
return invokers.get(random.nextInt(length));
}
}
它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。
5.2 基于最少活跃调用数算法的 LeastActiveLoadBalance
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
// 最小的活跃数
int leastActive = -1;
// 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
int leastCount = 0;
// leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
int[] leastIndexs = new int[length];
int totalWeight = 0;
// 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
// 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
int firstWeight = 0;
boolean sameWeight = true;
// 遍历 invokers 列表
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取 Invoker 对应的活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取权重 - ⭐️
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
// 发现更小的活跃数,重新开始
if (leastActive == -1 || active < leastActive) {
// 使用当前活跃数 active 更新最小活跃数 leastActive
leastActive = active;
// 更新 leastCount 为 1
leastCount = 1;
// 记录当前下标值到 leastIndexs 中
leastIndexs[0] = i;
totalWeight = weight;
firstWeight = weight;
sameWeight = true;
// 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
} else if (active == leastActive) {
// 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
leastIndexs[leastCount++] = i;
// 累加权重
totalWeight += weight;
// 检测当前 Invoker 的权重与 firstWeight 是否相等,
// 不相等则将 sameWeight 置为 false
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
// 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
// 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
if (!sameWeight && totalWeight > 0) {
// 随机生成一个 [0, totalWeight) 之间的数字
int offsetWeight = random.nextInt(totalWeight);
// 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
// 当 offset 小于等于0时,返回相应的 Invoker
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
// 获取权重值,并让随机数减去权重值 - ⭐️
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 如果权重相同或权重为0时,随机返回一个 Invoker
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}
代码过程如下:
- 遍历 invokers 列表,寻找活跃数最小的 Invoker
- 如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker
- 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
- 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
- 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
- 如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可
从代码可以看出来,除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。
5.3 基于 hash 一致性的 ConsistentHashLoadBalance
一致性hash算法原理是:首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。
dubbo 对其的实现类 ConsistentHashLoadBalance的 doSelect()方法只做了一些前置工作,真正实现在 ConsistentHashSelector 的 select方法中,其中 virtualInvokers 是一个 treeMap,记录了各个节点的虚拟节点的 hash -> Invoker,初始化工作在 ConsistentHashSelector 的构造方法中完成,由于篇幅原因没有贴出来,下面的 select() 算法的实际步骤:
public Invoker<T> select(Invocation invocation) {
// 将参数转为 key
String key = toKey(invocation.getArguments());
// 对参数 key 进行 md5 运算
byte[] digest = md5(key);
// 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
// 寻找合适的 Invoker
return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
// 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
// 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,
// 需要将 TreeMap 的头节点赋值给 entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 返回 Invoker
return entry.getValue();
}
5.4 基于加权轮询算法的 RoundRobinLoadBalance
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size();
int maxWeight = 0;
int minWeight = Integer.MAX_VALUE;
final List<Invoker<T>> invokerToWeightList = new ArrayList<>();
// 查找最大和最小权重
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight);
minWeight = Math.min(minWeight, weight);
if (weight > 0) {
invokerToWeightList.add(invokers.get(i));
}
}
// 获取当前服务对应的调用序列对象 AtomicPositiveInteger
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
// 创建 AtomicPositiveInteger,默认值为0
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 获取下标序列对象 AtomicPositiveInteger
AtomicPositiveInteger indexSeq = indexSeqs.get(key);
if (indexSeq == null) {
// 创建 AtomicPositiveInteger,默认值为 -1
indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
indexSeq = indexSeqs.get(key);
}
if (maxWeight > 0 && minWeight < maxWeight) {
length = invokerToWeightList.size();
while (true) {
int index = indexSeq.incrementAndGet() % length;
int currentWeight = sequence.get() % maxWeight;
// 每循环一轮(index = 0),重新计算 currentWeight
if (index == 0) {
currentWeight = sequence.incrementAndGet() % maxWeight;
}
// 检测 Invoker 的权重是否大于 currentWeight,大于则返回
if (getWeight(invokerToWeightList.get(index), invocation) > currentWeight) {
return invokerToWeightList.get(index);
}
}
}
// 所有 Invoker 权重相等,此时进行普通的轮询即可
return invokers.get(sequence.incrementAndGet() % length);
}
}
代码逻辑是这样的:每进行一轮循环,重新计算 currentWeight。如果当前 Invoker 权重大于 currentWeight,则返回该 Invoker。下面举例说明,假设服务器 [A, B, C] 对应权重 [5, 2, 1]。
- 第一轮循环,currentWeight = 1,可返回 A 和 B
- 第二轮循环,currentWeight = 2,返回 A
- 第三轮循环,currentWeight = 3,返回 A
- 第四轮循环,currentWeight = 4,返回 A
- 第五轮循环,currentWeight = 0,返回 A, B, C
如上,这里的一轮循环是指 index 再次变为0所经历过的循环,这里可以把 index = 0 看做是一轮循环的开始。每一轮循环的次数与 Invoker 的数量有关,Invoker 数量通常不会太多,所以我们可以认为上面代码的时间复杂度为常数级。
以上就是 dubbo 四种负载均衡算法的实现
六、总结
Dubbo 是目前最流行的 rpc 框架之一,也是面试过程中经常遇到的面试题,了解其中的原理有利于更好的使用,更能化解面试中的尴尬,还能丰富自身的编程经验和架构经验。dubbo 还更多的源码值得深入分析,但服务提供和服务调用是其中比较重要的组成部分,其他还包括:通信协议、服务治理、服务路由、高可用集群搭建模式等等。将在以后逐步解析。