Dubbo之服务消费原理
前言
上篇文章《dubbo之服务暴露》分析 dubbo 服务是如何暴露的,本文接着分析 dubbo 服务的消费流程。主要从以下几个方面进行分析:注册中心的暴露;通过注册中心进行服务消费通知;直连服务进行消费。
服务消费端启动时,将自身的信息注册到注册中心的目录,同时还订阅服务提供方的目录,当服务提供方的 url 发生更改时,实时获取新的数据。
服务消费端流程
下面是一个服务消费的流程图:
上图中可以看到,服务消费的流程与服务暴露的流程有点类似逆向的。同样,dubbo 服务也是分为两个大步骤:第一步就是将远程服务通过protocol
转换成invoker
(概念在上篇文章中有解释)。第二步通过动态代理将invoker
转换成消费服务需要的接口。
org.apache.dubbo.config.referenceconfig
类是referencebean
的父类,与生产端服务的servicebean
一样,存放着解析出来的 xml 和注解信息。类关系如下:
服务初始化中转换的入口
当我们消费端调用本地接口就能实现远程服务的调用,这是怎么实现的呢?根据上面的流程图,来分析消费原理。
在消费端进行初始化时referenceconfig#init
,会执行referenceconfig#createproxy
来完成这一系列操作。以下为referenceconfig#createproxy
主要的代码部分:
private t createproxy(map<string, string> map) { // 判断是否为 jvm 本地引用 if (shouldjvmrefer(map)) { // 通过 injvm 协议,获取本地服务 url url = new url(local_protocol, localhost_value, 0, interfaceclass.getname()).addparameters(map); invoker = ref_protocol.refer(interfaceclass, url); } else { urls.clear(); // 判断是否有自定义的直连地址,或注册中心地址 if (url != null && url.length() > 0) { string[] us = semicolon_split_pattern.split(url); if (us != null && us.length > 0) { for (string u : us) { url url = url.valueof(u); if (stringutils.isempty(url.getpath())) { url = url.setpath(interfacename); } if (urlutils.isregistry(url)) { // 如果是注册中心protocol类型,则向地址中添加 refer 服务消费元数据 urls.add(url.addparameterandencoded(refer_key, stringutils.toquerystring(map))); } else { // 直连服务提供端 urls.add(clusterutils.mergeurl(url, map)); } } } } else { // 组装注册中心的配置 if (!local_protocol.equalsignorecase(getprotocol())) { // 检查配置中心 checkregistry(); list<url> us = configvalidationutils.loadregistries(this, false); if (collectionutils.isnotempty(us)) { for (url u : us) { url monitorurl = configvalidationutils.loadmonitor(this, u); if (monitorurl != null) { // 监控上报信息 map.put(monitor_key, url.encode(monitorurl.tofullstring())); } // 注册中心地址添加 refer 服务消费元数据 urls.add(u.addparameterandencoded(refer_key, stringutils.toquerystring(map))); } } } } // 只有一条注册中心数据,即单注册中心 if (urls.size() == 1) { // 将远程服务转化成 invoker invoker = ref_protocol.refer(interfaceclass, urls.get(0)); } else { // 因为多注册中心就会存在多个 invoker,这里用保存在 list 中 list<invoker<?>> invokers = new arraylist<invoker<?>>(); url registryurl = null; for (url url : urls) { // 将每个注册中心转换成 invoker 数据 invokers.add(ref_protocol.refer(interfaceclass, url)); if (urlutils.isregistry(url)) { // 会覆盖前遍历的注册中心,使用最后一条注册中心数据 registryurl = url; } } if (registryurl != null) { // 默认使用 zone-aware 策略来处理多个订阅 url u = registryurl.addparameterifabsent(cluster_key, zoneawarecluster.name); // 将转换后的多个 invoker 合并成一个 invoker = cluster.join(new staticdirectory(u, invokers)); } else { invoker = cluster.join(new staticdirectory(invokers)); } } } // 利用动态代理,将 invoker 转换成本地接口代理 return (t) proxy_factory.getproxy(invoker); }
上面转换的过程中,主要可概括为:先分为本地引用和远程引用两类。本地就是以 injvm 协议的获取本地服务,这不做过多说明;远程引用分为直连服务和通过注册中心。注册中心分为单注册中心和多注册中心的情况,单注册中心好解决,直接使用即可,多注册中心时,将转换后的 invoker 合并成一个 invoker。最后通过动态代理将 invoker 转换成本地接口代理。
获取 invoker 实例
由于本地服务时直接从缓存中获取,这里就注册中心的消费进行分析,上面代码片段中使用的是ref_protocol.refer
进行转换,该方法代码:
public <t> invoker<t> refer(class<t> type, url url) throws rpcexception { // 获取服务的注册中心url,里面会设置注册中心的协议和移除 registry 的参数 url = getregistryurl(url); // 获取注册中心实例 registry registry = registryfactory.getregistry(url); if (registryservice.class.equals(type)) { return proxyfactory.getinvoker((t) registry, type, url); } // 获取服务消费元数据 map<string, string> qs = stringutils.parsequerystring(url.getparameteranddecoded(refer_key)); // 从服务消费元数据中获取分组信息 string group = qs.get(group_key); if (group != null && group.length() > 0) { if ((comma_split_pattern.split(group)).length > 1 || "*".equals(group)) { // 执行 invoker 转换工作 return dorefer(getmergeablecluster(), registry, type, url); } } // 执行 invoker 转换工作 return dorefer(cluster, registry, type, url); }
上面主要是获取服务消费的注册中心实例和进行服务分组,最后调用dorefer
方法进行转换工作,以下为dorefer
的代码:
private <t> invoker<t> dorefer(cluster cluster, registry registry, class<t> type, url url) { // 创建 registrydirectory 对象 registrydirectory<t> directory = new registrydirectory<t>(type, url); // 设置注册中心 directory.setregistry(registry); // 设置协议 directory.setprotocol(protocol); // directory.geturl().getparameters() 是服务消费元数据 map<string, string> parameters = new hashmap<string, string>(directory.geturl().getparameters()); url subscribeurl = new url(consumer_protocol, parameters.remove(register_ip_key), 0, type.getname(), parameters); if (!any_value.equals(url.getserviceinterface()) && url.getparameter(register_key, true)) { directory.setregisteredconsumerurl(getregisteredconsumerurl(subscribeurl, url)); // 消费消息注册到注册中心 registry.register(directory.getregisteredconsumerurl()); } directory.buildrouterchain(subscribeurl); // 服务消费者订阅:服务提供端,动态配置,路由的通知 directory.subscribe(subscribeurl.addparameter(category_key, providers_category + "," + configurators_category + "," + routers_category)); // 多个invoker合并为一个 invoker invoker = cluster.join(directory); return invoker; }
上面实现主要是完成创建 registrydirectory 对象,将消费服务元数据注册到注册中心,通过 registrydirectory 对象里的信息,实现服务提供端,动态配置及路由的订阅相关功能。
registrydirectory 这个类实现了 notifylistener 这个通知监听接口,当订阅的服务,配置或路由发生变化时,会接收到通知,进行相应改变:
public synchronized void notify(list<url> urls) { // 将服务提供方配置,路由配置,服务提供方的服务分别以不同的 key 保存在 map 中 map<string, list<url>> categoryurls = urls.stream() .filter(objects::nonnull) .filter(this::isvalidcategory) .filter(this::isnotcompatiblefor26x) .collect(collectors.groupingby(url -> { if (urlutils.isconfigurator(url)) { return configurators_category; } else if (urlutils.isroute(url)) { return routers_category; } else if (urlutils.isprovider(url)) { return providers_category; } return ""; })); // 更新服务提供方配置 list<url> configuratorurls = categoryurls.getordefault(configurators_category, collections.emptylist()); this.configurators = configurator.toconfigurators(configuratorurls).orelse(this.configurators); // 更新路由配置 list<url> routerurls = categoryurls.getordefault(routers_category, collections.emptylist()); torouters(routerurls).ifpresent(this::addrouters); // 加载服务提供方的服务信息 list<url> providerurls = categoryurls.getordefault(providers_category, collections.emptylist()); /** * 3.x added for extend url address */ extensionloader<addresslistener> addresslistenerextensionloader = extensionloader.getextensionloader(addresslistener.class); list<addresslistener> supportedlisteners = addresslistenerextensionloader.getactivateextension(geturl(), (string[]) null); if (supportedlisteners != null && !supportedlisteners.isempty()) { for (addresslistener addresslistener : supportedlisteners) { providerurls = addresslistener.notify(providerurls, geturl(),this); } } // 重新加载 invoker 实例 refreshoverrideandinvoker(providerurls); }
registrydirectory#notify
里面最后会刷新 invoker 进行重新加载,下面是核心代码的实现:
private void refreshoverrideandinvoker(list<url> urls) { // mock zookeeper://xxx?mock=return null overridedirectoryurl(); // 刷新 invoker refreshinvoker(urls); } private void refreshinvoker(list<url> invokerurls) { assert.notnull(invokerurls, "invokerurls should not be null"); if (invokerurls.size() == 1 && invokerurls.get(0) != null && empty_protocol.equals(invokerurls.get(0).getprotocol())) { ...... } else { // 刷新之前的 invoker map<string, invoker<t>> oldurlinvokermap = this.urlinvokermap; // local reference // 加载新的 invoker map map<string, invoker<t>> newurlinvokermap = toinvokers(invokerurls);// translate url list to invoker map // 获取新的 invokers list<invoker<t>> newinvokers = collections.unmodifiablelist(new arraylist<>(newurlinvokermap.values())); // 缓存新的 invokers routerchain.setinvokers(newinvokers); this.invokers = multigroup ? tomergeinvokerlist(newinvokers) : newinvokers; this.urlinvokermap = newurlinvokermap; try { // 通过新旧 invokers 对比,销毁无用的 invokers destroyunusedinvokers(oldurlinvokermap, newurlinvokermap); // close the unused invoker } catch (exception e) { logger.warn("destroyunusedinvokers error. ", e); } } }
获取刷新前后的 invokers,将新的 invokers 重新缓存起来,通过对比,销毁无用的 invoker。
上面将 url 转换 invoker 是在registrydirectory#toinvokers
中进行。
private map<string, invoker<t>> toinvokers(list<url> urls) { map<string, invoker<t>> newurlinvokermap = new hashmap<>(); set<string> keys = new hashset<>(); string queryprotocols = this.querymap.get(protocol_key); for (url providerurl : urls) { // 过滤消费端不匹配的协议,及非法协议 ...... // 合并服务提供端配置数据 url url = mergeurl(providerurl); // 过滤重复的服务提供端配置数据 string key = url.tofullstring(); if (keys.contains(key)) { continue; } keys.add(key); // 缓存键是不与使用者端参数合并的url,无论使用者如何合并参数,如果服务器url更改,则再次引用 map<string, invoker<t>> localurlinvokermap = this.urlinvokermap; // local reference invoker<t> invoker = localurlinvokermap == null ? null : localurlinvokermap.get(key); // 缓存无对应 invoker,再次调用 protocol#refer 是否有数据 if (invoker == null) { try { boolean enabled = true; if (url.hasparameter(disabled_key)) { enabled = !url.getparameter(disabled_key, false); } else { enabled = url.getparameter(enabled_key, true); } if (enabled) { invoker = new invokerdelegate<>(protocol.refer(servicetype, url), url, providerurl); } } catch (throwable t) { logger.error("failed to refer invoker for interface:" + servicetype + ",url:(" + url + ")" + t.getmessage(), t); } // 将新的 invoker 缓存起来 if (invoker != null) { // put new invoker in cache newurlinvokermap.put(key, invoker); } } else { // 缓存里有数据,则进行重新覆盖 newurlinvokermap.put(key, invoker); } } keys.clear(); return newurlinvokermap; }
总结
通过《dubbo之服务暴露》和本文两篇文章对 dubbo 服务暴露和服务消费原理的了解。我们可以看到,不管是暴露还是消费,dubbo 都是以 invoker 为数据交换主体进行,通过对 invoker 发起调用,实现一个远程或本地的实现。
个人博客:
关注公众号 【ytao】,更多原创好文