【Dubbo源码阅读系列】服务暴露之本地暴露
在上一篇文章中我们介绍 dubbo 自定义标签解析相关内容,其中我们自定义的 xml 标签 <dubbo:service />
会被解析为 servicebean 对象(传送门:dubbo xml 配置加载)。今天我们讲述的内容和 servicebean 密切相关!
细心的读者在阅读 servicebean 类时会发现 onapplicationevent() 方法和 afterpropertiesset() 方法调用了一个共同的方法 export()。直觉告诉我们这个方法应该和服务的暴露有关,我们接下来就
从 export() 方法入手分析。
export()方法调用时机
为了解答 export() 调用时机问题,我们需要关注 servicebean 类中的三个方法
- setapplicationcontext(applicationcontext applicationcontext)
servicebean 实现了 applicationcontextaware 接口,在 servicebean 初始化后,会调用 setapplicationcontext 注入 spring 上下文; - afterpropertiesset()
注入 applicationconfig、registries、protocols 等属性; - onapplicationevent(contextrefreshedevent event)
这里接受的 event 事件类型为 contextrefreshedevent。当 applicationcontext 被初始化或者刷新时,会调用该方法。
这三个方法在 spring 生命周期中被调用的顺序大致如下图所示
setapplicationcontext()——> afterpropertiesset() ——> onapplicationevent()
我们结合代码继续看
public void setapplicationcontext(applicationcontext applicationcontext) { this.applicationcontext = applicationcontext; springextensionfactory.addapplicationcontext(applicationcontext); supportedapplicationlistener = addapplicationlistener(applicationcontext, this); } public void onapplicationevent(contextrefreshedevent event) { if (!isexported() && !isunexported()) { if (logger.isinfoenabled()) { logger.info("the service ready on spring started. service: " + getinterface()); } export(); } } public void afterpropertiesset() throws exception { // 省略... if (!supportedapplicationlistener) { export(); } }
代码执行逻辑大致如下:
- 首先执行 setapplicationcontext() 方法,注入上下文。这里的 supportedapplicationlistener 用于判断 spring 是否支持 spring 监听机制。
- 执行 afterpropertiesset() 方法。如果 supportedapplicationlistener 值为 false,调用 export() 方法。
- 执行 onapplicationevent() 方法。如果没有执行过 export() 以及 unexport() 方法,调用 export() 方法。
通过上面简单的分析我们可以看到 export() 方法只会在 onapplicationevent() 和 export() 方法中调用一次。
export() 方法解析
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getexport(); } if (delay == null) { delay = provider.getdelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayexportexecutor.schedule(new runnable() { @override public void run() { doexport(); } }, delay, timeunit.milliseconds); } else { doexport(); } }
export()方法比较简单。注意这里有个 delay 变量,我们可以使用该变量延迟执行 export() 方法。
继续看 doexport() 方法
protected synchronized void doexport() { // 省略... doexporturls(); providermodel providermodel = new providermodel(getuniqueservicename(), ref, interfaceclass); applicationmodel.initprovidermodel(getuniqueservicename(), providermodel); } private void doexporturls() { list<url> registryurls = loadregistries(true); for (protocolconfig protocolconfig : protocols) { doexporturlsfor1protocol(protocolconfig, registryurls); } }
doexport()方法省略了很多 servicebean 配置校验和初始化代码。大家有兴趣可以自行阅览。这里直接划重点!!!分析 doexporturls() 方法!!!
先看 loadregistries() 方法:
loadregistries()
protected list<url> loadregistries(boolean provider) { checkregistry(); list<url> registrylist = new arraylist<url>(); // registries 在 afterpropertiesset() 方法中初始化 if (registries != null && !registries.isempty()) { for (registryconfig config : registries) { string address = config.getaddress(); if (address == null || address.length() == 0) { address = constants.anyhost_value; } string sysaddress = system.getproperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } if (address.length() > 0 && !registryconfig.no_available.equalsignorecase(address)) { map<string, string> map = new hashmap<string, string>(); // 将 application/config 部分属性整合到 map 中,详细见: appendparameters(map, application); appendparameters(map, config); map.put("path", registryservice.class.getname()); map.put("dubbo", version.getprotocolversion()); map.put(constants.timestamp_key, string.valueof(system.currenttimemillis())); if (configutils.getpid() > 0) { map.put(constants.pid_key, string.valueof(configutils.getpid())); } if (!map.containskey("protocol")) { if (extensionloader.getextensionloader(registryfactory.class).hasextension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 构建 url ,返回结果类似 zookeeper://192.168.0.100:2181/org.apache.dubbo.registry.registryservice? // application=demo-provider&dubbo=2.0.2&pid=22705&qos.port=22222×tamp=1549005672530 list<url> urls = urlutils.parseurls(address, map); for (url url : urls) { // 将此时 url 的 protocol 保存到 registry 参数中 url = url.addparameter(constants.registry_key, url.getprotocol()); // 设置 url protcol 属性为 registry url = url.setprotocol(constants.registry_protocol); if ((provider && url.getparameter(constants.register_key, true)) || (!provider && url.getparameter(constants.subscribe_key, true))) { registrylist.add(url); } } } } } return registrylist; }
loadregistries() 用于加载注册中心。概括来说就是用于解析我们在配置文件中定义的 <dubbo:registry />
标签。
checkregistry() 方法用于校验注册中心配置校验,里面有一些版本兼容的代码。appendparameters() 方法详见 appendparameters() 小节。
本地暴露
介绍完 loadregistries() 方法,我们接着看 doexporturlsfor1protocol()。doexporturlsfor1protocol() 方法比较长,这里我们挑出和本地暴露相关的内容进行分析。
if (!constants.scope_none.equalsignorecase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) if (!constants.scope_remote.equalsignorecase(scope)) { exportlocal(url); } if (!constants.scope_local.equalsignorecase(scope)) { // 远程暴露相关内容,省略... } } private void exportlocal(url url) { if (!constants.local_protocol.equalsignorecase(url.getprotocol())) { url local = url.valueof(url.tofullstring()) .setprotocol(constants.local_protocol) .sethost(localhost) .setport(0); exporter<?> exporter = protocol.export( proxyfactory.getinvoker(ref, (class) interfaceclass, local)); exporters.add(exporter); logger.info("export dubbo service " + interfaceclass.getname() + " to local registry"); } }
看到 exportlocal() 方法,意味着我们已经快要直达本地服务暴露的核心了!更令人按捺不住的是!这里又用到了 dubbo 中的 spi 机制(详见系列第一篇dubbo spi)。让我们看看这里到底做了什么?
private static final protocol protocol = extensionloader.getextensionloader(protocol.class).getadaptiveextension(); private static final proxyfactory proxyfactory = extensionloader.getextensionloader(proxyfactory.class).getadaptiveextension();
熟悉的配方熟悉的料,在这里我们获取了 protocol 和 proxyfactory 对应的自适应扩展类。根据方法调用的嵌套逻辑,先来看 proxyfactory 自适应扩展类 proxyfactory$adaptive 的 getinvoker() 方法。
核心方法 proxyfactory.getinvoker()
public class proxyfactory$adaptive implements org.apache.dubbo.rpc.proxyfactory { public org.apache.dubbo.rpc.invoker getinvoker(java.lang.object arg0, java.lang.class arg1, org.apache.dubbo.common.url arg2) throws org.apache.dubbo.rpc.rpcexception { if (arg2 == null) throw new illegalargumentexception("url == null"); org.apache.dubbo.common.url url = arg2; string extname = url.getparameter("proxy", "javassist"); if(extname == null) throw new illegalstateexception("fail to get extension(org.apache.dubbo.rpc.proxyfactory) name from url(" + url.tostring() + ") use keys([proxy])"); org.apache.dubbo.rpc.proxyfactory extension = null; try { extension = (org.apache.dubbo.rpc.proxyfactory)extensionloader.getextensionloader(org.apache.dubbo.rpc.proxyfactory.class).getextension(extname); }catch(exception e){ if (count.incrementandget() == 1) { logger.warn("failed to find extension named " + extname + " for type org.apache.dubbo.rpc.proxyfactory, will use default extension javassist instead.", e); } extension = (org.apache.dubbo.rpc.proxyfactory)extensionloader.getextensionloader(org.apache.dubbo.rpc.proxyfactory.class).getextension("javassist"); } return extension.getinvoker(arg0, arg1, arg2); } }
这里我们实际会去调用 stubproxyfactorywrapper 包装类的 getinvoker() 方法,如果不明白可以先看下 【dubbo源码阅读系列】之 dubbo spi 机制。
public class stubproxyfactorywrapper implements proxyfactory { public <t> invoker<t> getinvoker(t proxy, class<t> type, url url) throws rpcexception { return proxyfactory.getinvoker(proxy, type, url); } } public class javassistproxyfactory extends abstractproxyfactory { public <t> invoker<t> getinvoker(t proxy, class<t> type, url url) { // todo wrapper cannot handle this scenario correctly: the classname contains '$' final wrapper wrapper = wrapper.getwrapper(proxy.getclass().getname().indexof('$') < 0 ? proxy.getclass() : type); return new abstractproxyinvoker<t>(proxy, type, url) { @override protected object doinvoke(t proxy, string methodname, class<?>[] parametertypes, object[] arguments) throws throwable { return wrapper.invokemethod(proxy, methodname, parametertypes, arguments); } }; } }
结合上面的代码我们发现,发现最后调用的是 javassistproxyfactory 类的 getinvoker() 方法。其中 wrapper 是动态生成的代理对象。最后返回一个 abstractproxyinvoker 对象,doinvoke() 方法会调用 wrapper 代理类的 invokemethod() 方法,其中 invokemethod() 方法大概如下所示:
public object invokemethod(object o, string n, class[] p, object[] v) throws java.lang.reflect.invocationtargetexception { org.apache.dubbo.demo.provider.demoserviceimpl w; try { w = ((org.apache.dubbo.demo.provider.demoserviceimpl) $1); } catch (throwable e) { throw new illegalargumentexception(e); } try { if ("sayhello".equals($2) && $3.length == 1) { return ($w) w.sayhello((java.lang.string) $4[0]); } } catch (throwable e) { throw new java.lang.reflect.invocationtargetexception(e); } throw new org.apache.dubbo.common.bytecode.nosuchmethodexception("not found method \"" + $2 + "\" in class org.apache.dubbo.demo.provider.demoserviceimpl."); }
稍微有一点绕,至少我们已经看完了 proxyfactory.getinvoker() 方法了,我们获取到了一个包装了动态代理类的 abstractproxyinvoker 对象。接下来继续看 protocol.export() 方法。
核心方法 protocol.export()
public org.apache.dubbo.rpc.exporter export(org.apache.dubbo.rpc.invoker arg0) throws org.apache.dubbo.rpc.rpcexception { if (arg0 == null) throw new illegalargumentexception("org.apache.dubbo.rpc.invoker argument == null"); if (arg0.geturl() == null) throw new illegalargumentexception("org.apache.dubbo.rpc.invoker argument geturl() == null");org.apache.dubbo.common.url url = arg0.geturl(); string extname = ( url.getprotocol() == null ? "dubbo" : url.getprotocol() ); if(extname == null) throw new illegalstateexception("fail to get extension(org.apache.dubbo.rpc.protocol) name from url(" + url.tostring() + ") use keys([protocol])"); org.apache.dubbo.rpc.protocol extension = null; try { extension = (org.apache.dubbo.rpc.protocol)extensionloader.getextensionloader(org.apache.dubbo.rpc.protocol.class).getextension(extname); }catch(exception e){ if (count.incrementandget() == 1) { logger.warn("failed to find extension named " + extname + " for type org.apache.dubbo.rpc.protocol, will use default extension dubbo instead.", e); } extension = (org.apache.dubbo.rpc.protocol)extensionloader.getextensionloader(org.apache.dubbo.rpc.protocol.class).getextension("dubbo"); } return extension.export(arg0); }
由于此时的 url 中 protocol 值为 injvm(url 经过 setprotocol(local_protocol) 操作后 protocol 已经更新为 injvm),因此我们这里获得的扩展类实际为包装了 injvmprotocol 的包装类对象,对 wrapper 类有疑问的可以看下【dubbo源码阅读系列】之 dubbo spi 机制。
这里会涉及到一个方法 buildinvokerchain() 方,道它用于构建一个调用链。
整体调用时序简图如下所示:
最后 exportlocal() 方法中获取到的是一个 injvmexporter 对象,并将其添加到 serviceconfig 类的 exporters 集合中。
buildinvokerchain()
protocolfilterwrapper.java private static <t> invoker<t> buildinvokerchain(final invoker<t> invoker, string key, string group) { invoker<t> last = invoker; list<filter> filters = extensionloader.getextensionloader(filter.class).getactivateextension(invoker.geturl(), key, group); if (!filters.isempty()) { for (int i = filters.size() - 1; i >= 0; i--) { final filter filter = filters.get(i); final invoker<t> next = last; last = new invoker<t>() { // 省略 invoker 构建代码... @override public result invoke(invocation invocation) throws rpcexception { return filter.invoke(next, invocation); } // 省略 invoker 构建代码... }; } } return last; }
buildinvokerchain() 方法用于构建调用链,初步浏览下来发现调用链应该是由 filter 扩展类构成。那么这些 filter 扩展类又从何而来呢?这行代码很关键!!!
list<filter> filters = extensionloader.getextensionloader(filter.class).getactivateextension(invoker.geturl(), key, group);
对于这段代码我们应该有很强的亲切感,但仔细看又稍稍有所不同。实际上被 @activate 注解标记的扩展类会被加载到 extensionloader 类的 cachedactivates 集合中。
我们在调用 extensionloader 类的 getactivateextension() 时,会根据我们传入的 key 和 group 值从 cachedactivates 集合中获取满足当前条件的 filter 对象。
拿到 filters 集合后,会用链表的形式拼接 filter 调用链,举个例子:
假设当前获取到的 filters 集合中保存的 filter 对象为 filter0、filter1、filter2。我们对 filters 集合进行倒序遍历。最后获得的 last 其实为新建的 ivk2 对象。如果我们调用 last 的 invoke 方法,调用链如下图所示:
end
本文介绍了 export() 方法被调用的时机以及基本流程。并且花了一定篇幅对 dubbo 服务本地暴露进行了分析。其中掺杂了不少代码的分析,可能没有面面俱到吧。还是建议大家自己自己 debug 一下,很多东西瞬间秒懂,有助于源码理解。下一篇文章我们介绍 dubbo 服务远程暴露。
appendproperties()
protected static void appendproperties(abstractconfig config) { if (config == null) { return; } // gettagname:获取去除了 bean/config 结尾的小写类名(applicationconfig->application) string prefix = "dubbo." + gettagname(config.getclass()) + "."; method[] methods = config.getclass().getmethods(); for (method method : methods) { try { string name = method.getname(); // 1、方法长度大于3;2、方法以 set 开头;3、方法修饰符类型为 public;4、形参个数为 1;5、形参类型为基本类型 if (name.length() > 3 && name.startswith("set") && modifier.ispublic(method.getmodifiers()) && method.getparametertypes().length == 1 && isprimitive(method.getparametertypes()[0])) { // cameltosplitname: 举个例子 applicationconfig——>application.config string property = stringutils.cameltosplitname(name.substring(3, 4).tolowercase() + name.substring(4), "."); string value = null; if (config.getid() != null && config.getid().length() > 0) { // 拼接属性名称,并尝试获取对应属性 string pn = prefix + config.getid() + "." + property; value = system.getproperty(pn); if (!stringutils.isblank(value)) { logger.info("use system property " + pn + " to config dubbo"); } } if (value == null || value.length() == 0) { // 比如当前 config 为 applicationconfig,pn = dubbo.application.xxx string pn = prefix + property; value = system.getproperty(pn); if (!stringutils.isblank(value)) { logger.info("use system property " + pn + " to config dubbo"); } } if (value == null || value.length() == 0) { method getter; try { getter = config.getclass().getmethod("get" + name.substring(3)); } catch (nosuchmethodexception e) { try { getter = config.getclass().getmethod("is" + name.substring(3)); } catch (nosuchmethodexception e2) { getter = null; } } if (getter != null) { if (getter.invoke(config) == null) { // 尝试使用 configutils.getproperty() 方法获取属性值 // 尝试从 dubbo.properties.file 文件或 dubbo.properties 文件中读取属性 if (config.getid() != null && config.getid().length() > 0) { value = configutils.getproperty(prefix + config.getid() + "." + property); } if (value == null || value.length() == 0) { value = configutils.getproperty(prefix + property); } if (value == null || value.length() == 0) { string legacykey = legacyproperties.get(prefix + property); if (legacykey != null && legacykey.length() > 0) { value = convertlegacyvalue(legacykey, configutils.getproperty(legacykey)); } } } } } if (value != null && value.length() > 0) { method.invoke(config, convertprimitive(method.getparametertypes()[0], value)); } } } catch (exception e) { logger.error(e.getmessage(), e); } } }
appendparameters()
protected static void appendparameters(map<string, string> parameters, object config) { appendparameters(parameters, config, null); } protected static void appendparameters(map<string, string> parameters, object config, string prefix) { if (config == null) { return; } method[] methods = config.getclass().getmethods(); // 遍历 config 类方法集合 for (method method : methods) { try { string name = method.getname(); // 找到满足以下的方法:以set/is 开头,非 getclass;方法修饰符为 public;方法参数个数为 0;返回类型为基本类型 if ((name.startswith("get") || name.startswith("is")) && !"getclass".equals(name) && modifier.ispublic(method.getmodifiers()) && method.getparametertypes().length == 0 && isprimitive(method.getreturntype())) { // 获取 parameter 注解 parameter parameter = method.getannotation(parameter.class); // @parameter(excluded = true),直接跳过 if (method.getreturntype() == object.class || parameter != null && parameter.excluded()) { continue; } int i = name.startswith("get") ? 3 : 2; string prop = stringutils.cameltosplitname(name.substring(i, i + 1).tolowercase() + name.substring(i + 1), "."); string key; if (parameter != null && parameter.key().length() > 0) { key = parameter.key(); } else { key = prop; } // 利用反射调用 config 类中的 get/is 方法 object value = method.invoke(config); string str = string.valueof(value).trim(); if (value != null && str.length() > 0) { // 是否需要转义,utf-8 if (parameter != null && parameter.escaped()) { str = url.encode(str); } if (parameter != null && parameter.append()) { string pre = parameters.get(constants.default_key + "." + key); if (pre != null && pre.length() > 0) { str = pre + "," + str; } pre = parameters.get(key); if (pre != null && pre.length() > 0) { str = pre + "," + str; } } if (prefix != null && prefix.length() > 0) { key = prefix + "." + key; } // key/value 添加到 parameters 集合 parameters.put(key, str); } else if (parameter != null && parameter.required()) { throw new illegalstateexception(config.getclass().getsimplename() + "." + key + " == null"); } // 方法名为 getparameters();方法修饰符为 public;方法形参个数为0;返回类型为 map } else if ("getparameters".equals(name) && modifier.ispublic(method.getmodifiers()) && method.getparametertypes().length == 0 && method.getreturntype() == map.class) { map<string, string> map = (map<string, string>) method.invoke(config, new object[0]); if (map != null && map.size() > 0) { string pre = (prefix != null && prefix.length() > 0 ? prefix + "." : ""); for (map.entry<string, string> entry : map.entryset()) { parameters.put(pre + entry.getkey().replace('-', '.'), entry.getvalue()); } } } } catch (exception e) { throw new illegalstateexception(e.getmessage(), e); } } }
该方法会调用当前类对象的 isxxx/getxxx 方法(非 getclass 方法;方法修饰符为 public;形参个数为 0;返回类型为基本类型),获取其返回值构造键值对添加到指定 map 集合中;同时也会解析 getparameters() 返回的结果,构造键值对注入到 map 集合中。
本blog上原创文章未经本人许可,不得用于商业用途及传统媒体。网络媒体转载请注明出处,否则属于侵权行为。
https://juejin.im/post/5c2b7ab46fb9a049d236273b