欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

浅谈dubbo之二服务的暴露过程

程序员文章站 2024-03-23 12:45:40
...

浅谈dubbo之二服务的暴露过程

文章刚写完 dubbo 3.0 就更新了,本文基于版本 2.6.5,版本虽低,但并不过时。

一个问题

下面是我们常见的 provider配置方式,那么为什么通过如下的配置就能够将 dubbo 成功暴露呢?

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
	xmlns="http://www.springframework.org/schema/beans"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
	http://dubbo.apache.org/schema/dubbo
	http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="demo-provider"/>
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    <dubbo:protocol name="dubbo" port="20880"/>
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
</beans>

这是因为dubbo利用了Spring的文件标签解析特性。dubbo中定义了spring.schemasspring.handlers 两个文件,当 Spring 解析到 dubbo 的标签时,就会去查找这两个文件。通过 spring.handlers 指明的解析类解析标签。

spring.schemas

http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd

spring.handlers

http\://dubbo.apache.org/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

从上面的spring.handlers文件我们得知 Spring实际上是通过dubboDubboNamespaceHandler类来解析标签的。接下来定位到该类。

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
	// ....
	@Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

这个类很简单就是通过DubboBeanDefinitionParser将我们配置在xml中的信息解析到对应的实现类中。比如<dubbo:application/> 解析到 ApplicationConfig等,类多数都以 xxxConfig 命名,比较特殊的是<dubbo:service/><dubbo:reference/> 这个两个标签分别被解析成了ServiceBeanReferenceBean类。与我们服务暴露有关的就是 ServiceBean这个类。

一,前置工作

ServiceBean类

ServiceBean实现了ApplicationListener<ContextRefreshedEvent> 接口,ContextRefreshedEvent表明 ApplicationListener接口的回调函数将会在 Spring上下文刷新事件之后调用(Event raised when an {@code ApplicationContext} gets initialized or refreshed.);

ServiceBean#onApplicationEvent

前面是各种条件检查,重点是 export()

public class ServiceBean<T> extends ServiceConfig<T> implements .... ApplicationListener<ContextRefreshedEvent>{
    // Spring 上下文刷新事件之后调用
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 服务暴露之前检查是不是延迟暴露,是不是已经暴露,是不是不需要暴露
        if (isDelay() && !isExported() && !isUnexported()) {
            // 暴露服务,这个方法实际上是父类中的方法
            export();
        }
    }
}

ServiceConfig#export

前面是各种条件检查,重点doExport()

public synchronized void export() {
     // 先对配置进行了检查,并根据配置执行相应的动作,无论是延迟暴露还是立即暴露最终都会调用doExport();
  if (delay != null && delay > 0) {
    delayExportExecutor.schedule(new Runnable() {
      @Override
      public void run() {
        doExport();
      }
    }, delay, TimeUnit.MILLISECONDS);
  } else {
    doExport();
  }
}

ServiceConfig#doExport

protected synchronized void doExport() {
    .....
    // 各种检测一大堆,其实总结一下就是:
    // 1. 检测 <dubbo:service> 标签的 interface 属性合法性,不合法则抛出异常
    // 2. 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
    // 3. 检测并处理泛化服务和普通服务类
    // 4. 检测本地存根配置,并进行相应的处理
    // 5. 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常
    // 导出服务
  doExportUrls();
    .....
}

ServiceConfig#doExportUrls

private void doExportUrls() {
  // 加载服务注册中心对象,下面代码表示 dubbo 支持向多个注册中心注册,也支持注册多个协议
  List<URL> registryURLs = loadRegistries(true);
  for (ProtocolConfig protocolConfig : protocols) {
        // 遍历 protocols,并在每个协议下导出服务
    doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  }
}

ServiceConfig#doExportUrlsFor1Protocol

这段代码很长,其实可以分为两部分。前半节组装 URL,后半节暴露服务。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  String name = protocolConfig.getName();
    // 如果协议名为空,则默认设置为 dubbo
  if (name == null || name.length() == 0) {
    name = "dubbo";
  }
    // 设置版本之类的东东
  Map<String, String> map = new HashMap<String, String>();
  map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
  map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
  map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
  if (ConfigUtils.getPid() > 0) {
    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
  }
    // 将对象字段通过反射添加到 map 中
  appendParameters(map, application);
  appendParameters(map, module);
  appendParameters(map, provider, Constants.DEFAULT_KEY);
  appendParameters(map, protocolConfig);
  appendParameters(map, this);
  if (methods != null && !methods.isEmpty()) {
        //.....
        //这里一大堆都是在分析 <dubbo:method> 的信息,与本文关系不大。
  }

  if (ProtocolUtils.isGeneric(generic)) {
    map.put(Constants.GENERIC_KEY, generic);
    map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
  } else {
    String revision = Version.getVersion(interfaceClass, version);
    if (revision != null && revision.length() > 0) {
      map.put("revision", revision);
    }
    String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    if (methods.length == 0) {
      map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
    } else {
      map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    }
  }
  if (!ConfigUtils.isEmpty(token)) {
    if (ConfigUtils.isDefault(token)) {
      map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
    } else {
      map.put(Constants.TOKEN_KEY, token);
    }
  }
  if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
    protocolConfig.setRegister(false);
    map.put("notify", "false");
  }
  String contextPath = protocolConfig.getContextpath();
  if ((contextPath == null || contextPath.length() == 0) && provider != null) {
    contextPath = provider.getContextpath();
  }
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
  Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 组装成 URL,组装后的效果如下图
  URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
  if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);
  }

    -------------------------------------这是分界线---------------------------------------------
  // 省略的代码在导出服务中
}

拼接完的 URL是这个样子滴

浅谈dubbo之二服务的暴露过程

也是这个样子滴

dubbo://192.168.72.1:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&bind.ip=192.168.72.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=11920&qos.port=22222&side=provider&timestamp=1604049505318

接下来我们就可以通过URL将服务导出了。

二,导出服务

导出服务分为导出到本地和导出到远程,而后者又分为是否包含注册中心

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // ...... 省略 URL 组装过程
    String scope = url.getParameter(Constants.SCOPE_KEY);
    // 根据 scope 决定导出逻辑
    // 1, scope = none,不导出服务
    // 2, scope != remote,导出到本地
    // 3, scope != local,导出到远程
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            // 导出到本地
            exportLocal(url);
        }
        // 导出到远程
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            // 有注册中心怎向注册中心注册并暴露
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                     // 监控器
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                // 没有注册中心,直连
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
     }
    }
    this.urls.add(url);
}

本地导出

为什么要本地暴露服务

可能存在服务提供方和服务消费方在同一个JVM的情况,将服务提供方暴露在本地JVM下可以让服务消费方直接调用,这样避免了远程调用的网络开销。

本地服务暴露原理

ServiceConfig#exportLocal

private void exportLocal(URL url) {
  if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        // 拼装本地暴露 URL,指定为 injvm 协议,这个协议不会打开端口而是把服务保存在内存中
        // 拼装后的效果如下图
    URL local = URL.valueOf(url.toFullString())
        .setProtocol(Constants.LOCAL_PROTOCOL)
        .setHost(LOCALHOST)  
        .setPort(0);
    // 这个地方用了两次自适应扩展,咱们在 2.1.2.1 和 2.1.2.2 中详说
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, local);
    Exporter<?> exporter = protocol.export(invoker);
    exporters.add(exporter);
  }
}

浅谈dubbo之二服务的暴露过程
proxyFactory#getInvoker

@SPI("javassist")
public interface ProxyFactory {
    //.....
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, local);

上面的代码会根据动态扩展机制生成适配器类ProxyFactory$Adaptive 从这个适配器类中我们得知最终会调用JavassistProxyFactory#getInvoker 进行代理,并生成Invoker

// 这类生成的 XXXX$Adaptive 适配器类,也可以在其包路径下新建 xxx.java 文件进行 debug
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
    // 只保留重要代码
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
         // ..... 
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        // 根据extName找到具体适应类,然后调用方法
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}
public class JavassistProxyFactory extends AbstractProxyFactory {
    //....
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

protocol#export

@SPI("dubbo")
public interface Protocol {
    //....
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    //....
}
Exporter<?> exporter = protocol.export(invoker);

与上同理,会通过动态生成的适配器类Protocol$Adaptive 最终调用InjvmProtocol#export 方法生成Exporter保存在暴露列表中。

远程暴露

远程服务暴露原理

这个地方分为远程暴露和向注册中心注册两部分,我们分开说,先说远程暴露再聊注册

ServiceConfig#doExportUrlsFor1Protocol

if (registryURLs != null && !registryURLs.isEmpty()) {
  for (URL registryURL : registryURLs) {
    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
    URL monitorUrl = loadMonitor(registryURL);
    if (monitorUrl != null) {
      url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
    }
    String proxy = url.getParameter(Constants.PROXY_KEY);
    if (StringUtils.isNotEmpty(proxy)) {
      registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
    }
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    Exporter<?> exporter = protocol.export(wrapperInvoker);
    exporters.add(exporter);
  }
}

上面代码经过 registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()); URL被组装成下面的样子。

浅谈dubbo之二服务的暴露过程

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.72.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dcom.alibaba.dubbo.demo.DemoService%26bind.ip%3D192.168.72.1%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D22220%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1604148390563&pid=22220&qos.port=22222&registry=zookeeper&timestamp=1604148390527

可以看出接下来会先根据registry协议走到RegistryProtocol#export , 然后走到RegistryProtocol#doLocalExport在这个方法中,此时的URL如下

// 其实就是 registryURL 内部的export这个key对应的 value 
dubbo://192.168.72.1:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&bind.ip=192.168.72.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1312&qos.port=22222&side=provider&timestamp=1604149443638

接下来根据dubbo://协议走到DubboProtocol#export 方法中生成Exporter并启动服务器。

DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  URL url = invoker.getUrl();
  String key = serviceKey(url);
    //创建 DubboExporter 放入缓存,在服务提供方处理请求时会从中获取出来
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  exporterMap.put(key, exporter);
    // .... 忽略本地存根相关代码
    // 启动服务器
  openServer(url);
  optimizeSerialization(url);
  return exporter;
}

其实走到openServer这一步就可以了,再往下就是使用Netty具体的通信了,只把调用的类和方法列举出来,详细的步骤就不多说了,有兴趣的您可以自己跟一下。

DubboProtocol#openServer --> 
  DubboProtocol#createServer --> 
    Exchanges#bind --> 
      HeaderExchanger#bind --> 
        Transporters#bind -->
          ...netty4.NettyTransporter#bind --> 
            ...netty4.NettyServer#doOpen

注册

注册原理

这一步实际上是在 RegistryProtocol#export 做的,接着doLocalExport往下看即可。

RegistryProtocol#export

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  // 省略doLocalExport 
  // 获取注册中心 URL zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService....
  URL registryUrl = getRegistryUrl(originInvoker);
  // 根据 URL 创建服务注册实例,这个地方通过扩展接口 RegistryFactory 最终会创建 ZookeeperRegistry 的实例
  final Registry registry = getRegistry(originInvoker);
  // 获取提供者的 URL dubbo://192.168.72.1:20880/com.alibaba.dubbo.demo.DemoService....
  final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); 
  boolean register = registeredProviderUrl.getParameter("register", true);
  // 将提供者信息注册到服务提供者与消费者的注册表中
  ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
  if (register) {
    // 这个地方是真正向注册中心注册了,最终会走到 ZookeeperRegistry#doRegister 方法,完成注册
    register(registryUrl, registeredProviderUrl);
    ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
  }
  // 忽略关于订阅的代码
  return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

ZookeeperRegistry#doRegister

@Override
protected void doRegister(URL url) {
  zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
}

AbstractZookeeperClient#create

@Override
public void create(String path, boolean ephemeral) {
  if (!ephemeral) {
    if (checkExists(path)) {
      return;
    }
  }
  int i = path.lastIndexOf('/');
  if (i > 0) {
    create(path.substring(0, i), false);
  }
  if (ephemeral) {
    createEphemeral(path);
  } else {
    createPersistent(path);
  }
}

参数 path 的值是

/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.72.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dcom.alibaba.dubbo.demo.DemoService%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D956%26side%3Dprovider%26timestamp%3D1604290291061

我们以com.alibaba.dubbo.demo.DemoService 接口为例,首先会先调用createPersistent方法递归地创建

/dubbo
/dubbo/com.alibaba.dubbo.demo.DemoService 
/dubbo/com.alibaba.dubbo.demo.DemoService/providers 

这些节点,再通过createEphemeral创建如下内容:

[dubbo%3A%2F%2F192.168.72.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dcom.alibaba.dubbo.demo.DemoService%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D22776%26side%3Dprovider%26timestamp%3D1604289898812]

到此,注册到注册中心就算完成了,我们看一下整体效果。

浅谈dubbo之二服务的暴露过程

直连式远程导出

这个就不分析了,过程其实已经包含在包含注册中心的暴露过程中了。

三,总结

dubbo 的服务导出看似麻烦,实际上概括的说可以分成三部分:
第一,检测配置是否为空,为空则新建,并根据配置信息组装成 URL;
第二,暴露服务,包括本地和远程;
第三,注册到注册中心;

2020年11月02日于将台