dubbo源码解析(五)rpc模块服务发布
目录
前言
在上面两篇有关dubbo服务发布和服务消费的过程中我们主要讲述了如何将所有的配置文件转换为对应的Dubbo URL,虽然也提及了ProxyFactory和Invoker但是也是浅尝辄止,对于服务发布或者消费具体原理并没有,这里我们使用一篇(不一定)来探究一个dubbo的服务发布和远程的rpc模块,通过对这个模块的学习将使我们对服务的发布和调用更加清晰。
此篇博文重点介绍服务发布的详细介绍, dubbo版本基于2.6.2
一、rpc模块常用类
- Invoker类是Dubbo核心模型呢,dubbo进行rpc调用的具体调用执行类,是其dubbo核心基础、对于服务方它是服务调用提供者,对于消费方它是服务的远程调用。该对象是通过ProxyFactory对象创建而来。在dubbo中默认实现为JavassistProxyFactory。
- Protocol:该类是dubbo进行底层服务之底层通讯协议,dubbo默认使用dubbo协议使用更高效的netty框架进行通信。
- 对于服务提供者执行export方法:启动xxxServer,将服务执行对象【invoker】暴露在网络中
- 对于服务消费者调用refer方法:创建xxxClient,封装为调用对象【invoker】供consumer调用
二、服务提供者
这里我们先通过服务提供者的相关服务发布过程了解其相关逻辑,话不多少上代码
//1、创建代理对象Invoker(AbstractProxyInvoker)
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//2、AbstractProxyInvoker对象包装成DelegateProviderMetaDataInvoker 添加了一个本身ServiceConfig实例作为metadata
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//3、服务发将服务执行对象【invoker】以对应的协议(dubbo协议)暴露在网络中
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
如上代码在ServiceConfig的export中最终将服务注册发布出去,首先下通过ProxyFactory(实现类为:JavassistProxyFactory)getInvoker获取服务调用实体对象Invoker
2.1、getInvoker方法
/**
* 获取服务调用方法实体Invoker
* @param proxy 接口服务实现对象(对外暴露的服务实例对象 该例子中为UserServiceImpl)
* @param type 接口类class
* @param url 发布的协议
1、注册中心为zk的为
registry://xxx:xxxx/com.alibaba.dubbo.registry.RegistryService
?application=demo-core&dubbo=2.6.2&export=dubbo://127.0.0.1/com.xiu.dubbo.service.UserService...
2、本地暴露(dubbo 2.2以后默认都会进行本地服务暴露)
injvm://127.0.0.1/com.xiu.dubbo.service.UserService?anyhost=true&application=demo-core...
* @param <T> 返回对应的invoker
* @return
*/
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//此处对服务提供方进行代理增强,此处使用了包装模式,在其中的makeWrapper为服务提供方
//添加一些方法,属性字段等,核心添加了invokMethod方法,服务提供方所有接口方法都会在该方法中进行调用
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//根据上述的包装类创建一个AbstractProxyInvoker,服务调用方消费对应方法会调用wrapper类的
//invokeMethod进行接口方法调用(因为wrapper动态将接口方法都封装到了invokeMethod中)
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
上述对象主要是通过Wrapper创建一个有关服务发布者实现类的代理对项,动态生成了一个核心方法InvokeMethod()方法,getWrapper方法通过装饰模式动态添加一些属性字段和方法
- 属性字段信息
字段 | 描述 | 涉及方法 |
String[] pns; |
服务接口对外暴露的属性名列表 |
getPropertyNames:获取属性名列表
|
Map pts; |
属性名和属性对应的类型集合 | hasProperty:判断属性是否存在getPropertyType:根据属性名获取属性对应的类型 |
String[] mns; |
接口所有方法名列表(所有接口公有方法和自己添加的相关方法) |
getMethodNames:获取该属性值 |
String[] dmns; |
接口所有对外暴露方法(所有接口公有方法) |
getDeclaredMethodNames:获取该属性值 |
- 方法信息
方法名 | 描述 |
setPropertyValue |
服务类中公有非静态字段设置 |
getPropertyValue |
服务类中公有非静态字段获取 |
invokeMethod |
服务接口中所有方法会动态生成到这里,消费者远程调用最终也是调用该方法 |
这里需要着重强调一个invokeMethod,以本示例中的UserService接口类为例,其中有一个方法queryUserInfo,该接口通过Wrappper.getWrapper(UserSercvice.class)获取到的invokMethod如下:
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.xiu.dubbo.service.UserService w;
try {
//接口实例对象赋值
w = ((com.xiu.dubbo.service.UserService) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
//该接口下的所有方法都会在该处通过方法名判断调用
if ("queryUserInfo".equals($2) && $3.length == 0) {
return ($w) w.queryUserInfo();
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.xiu.dubbo.service.UserService.");
}
2.2、服务发布
protocol.export(wrapperInvoker)默认实现为DubboProtocol将之前创建的服务接口对应的Invoker构建相应的通信协议发布服务,则服务消费者可以通过建立对应的通信协议调用该服务。
/**
* 服务发布最终创建一个DubboExporter
*/
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//根据之前Dubbo URL :dubbo://xxx:28830/com.xiu.dubbo.service.UserService...
//生成对应的key:com.xiu.dubbo.service.UserService:28830
//生成对应的value:invoker和key包装成DubboExporter对象
//(1)、将上面对应的key:value添加到exporterMap中
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//(2)、存在本地存根(远程调用时候核心服务在服务方,如果调用方需要对服务方服务调用完成需要在进行额外的处理
// 则需要创建本地存储对象)则将对应的存根方法存储到stubServiceMethodsMap
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//(3)、进行服务发布 主要是选用netty、Mina通信协议建立对应的Server 比如NettyServer、MinaServer
openServer(url);
//(4)、添加dubbo更高效的序列化机制Kryo, FST
optimizeSerialization(url);
return exporter;
}
上述代码主要分成四部分
(1)、servierKey和其对应DubboExporter存放到存放到exporterMap
(2)、如果存在Stub(本地存根 自行百度)存放到存放到stubServiceMethodsMap、Stub对象是服务调用方 方法的额外增强。
(3)、基于底层通信协议创建对应的XxxServer(nettyServer、MinaServer) 服务端,最终进行服务发布、这里我们后续展开讲解
(4)、添加更高效的序列化机制Kryo、FST
<dubbo:protocol name="dubbo" serialization="kryo" optimizer="com.alibaba.dubbo.demo.SerializationOptimizerImpl"/>
2.3、创建服务
openServer()方法
private void openServer(URL url) {
//根据ip:port先从缓存中获取ExchangeServer对象(该对象包含对应的Transporters协议)
//最终会基于netty、Mina等通信协议创建HeaderExchangeServer 里面包含Server(NettyServer、MinaServer)
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
//先从缓存中获取服务 没有则调用createServer创建新的HeaderExchangeServer
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//如果缓存中存在则ExchangeServer已经创建了 使用新的url重置
//因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务。
server.reset(url);
}
}
}
根据url的ip:port从缓存中获取 没有则创建createServer(),有则使用最新的url重置服务 server.reset()
createServer()方法
private ExchangeServer createServer(URL url) {
// channel.readonly.sent : 默认为true,表示在发送请求时,是否等待将字节写入socket后再返回
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// heartbeat:增加heartbeat属性 表示心跳间隔时间,默认为60*1000,表示60s
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
//为服务提供者url增加server属性通信机制,可选值为netty,mina等等,默认为netty。
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
//为服务提供者url增加codec属性(协议编码方式),默认值为dubbo,
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//服务与底层通信协议绑定 服务暴露
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//服务发布指定了客户端类型client=xxx 则需要查看具体的底层协议是否支持 不支持抛出异常
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
创建服务过程中主要是针对Douubo URL添加一些顶层通信相关的参数,并通过Exchangers.bind(url, requestHandler) 将服务和对应的底层通信协议进行绑定从而完成服务的暴露,同时还对服务是否支持的客户端类型进行校验。
- channel.readonly.sent:默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,。
- heartbeat :为服务提供者心跳间隔时间,表示,默认为60*1000,表示60s。
- server :为服务提供者url增加server 通信协议属性,可选值为netty,mina等等,默认为netty。
- codec:为服务提供者url增加codec属性(协议编码方式),默认值为dubbo,。
- Exchangers.bind:根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。
- client:验证客户端类型是否可用。
2.4、服务协议绑定
//1、com.alibaba.dubbo.remoting.exchange.Exchangers
//为
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
//默认使用headeExchanger去进行绑定
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
//2、com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//从中可以看到最终实现绑定是使用Transporters去进行绑定的
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//3、通信协议绑定
return getTransporter().bind(url, handler);
//com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
//4、以netty作为具体的通信协议 NettyTransporter的bind 创建NettryServer
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
//创建netty服务端
return new NettyServer(url, listener);
}
有关服务绑定,这里不以方法一一展开讲解,这里以绑定流程为分成三个部分
- 根据header 获取对应的HeaderExchanger 该类主要包含两个方法 服务发布的bind和服务消费的connect
HeaderExchanger:主要作用
ExchangeServer bind(URL url, ExchangeHandler handler) : 服务提供者调用。
ExchangeClient connect(URL url, ExchangeHandler handler):服务消费者调用。
- 通过Transporters(网络传输层)去进行绑定操作,这里以netty为例对应的网络协议的实现类为NettyTransporter
- NettyTransporter 绑定最终是创建一个netty通信对应创建NettyServer最终完成了服务的暴露。
同理server.reset()方法最终是调用NettyServer的reset方法。