Dubbo源码学习09
RegistryProtocol.export服务导出流程:导出服务ExporterChangeableWrapper->注册服务到注册中心->订阅注册中心overrideSubscribeUrl数据;篇幅有限,本篇幅主要分析注册服务到注册中心的实现
RegistryProtocol.export(final Invoker<T> invoker)
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
//导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
//获取register参数;register表示是否注册到注册中心
boolean register = registeredProviderUrl.getParameter("register", true);
//缓存到ProviderConsumerRegTable的表中
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
//注册服务到zookeeper
if (register) {
register(registryUrl, registeredProviderUrl);
//找到该originInvoker对应的ProviderInvokerWrapper设置reg属性为true
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
//创建监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
//放入overrideSubscribeUrl对应的OverrideListener
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
//创建并返回DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
getRegistryUrl(originInvoler)
private URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
//zookeeper
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryUrl;
}
该方法获取注册中心url,假如使用zookeeper作为注册中心,得到的示例
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
getRegistry(originInvoker)
/**
*
* 根据调用者的地址获取注册表的实例
*
* @param originInvoker
* @return
*/
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
RegistryFactory是dubbo的spi接口,由dubbo的spi机制可知,这里的registryFactory类型为RegistryFactory$Adaptvie代码如下
package com.alibaba.dubbo.registry;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
return extension.getRegistry(arg0);
}
}
通过debug得知registryFactory类型为ZookeeperRegistryFactory
AbstractRegistryFactory.getRegistry(URL url)
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
//zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
//访问已经缓存的Registry
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//模板方法,委托给子类创建
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
//加入缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
上述方法首先从缓存REGISTRIES中取,如果取出失败,通过子类覆盖createRegistry(url)创建相应类型的注册中心ZookeeperRegistry然后加入缓存中
ZookeeperRegistryFactory.createRegistry(URL url)
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
ZookeeperRegistry.java
RegistryService:注册中心服务接口
public interface RegistryService {
/**
* 注册数据,例如:提供者服务,使用者地址,路由规则,覆盖规则和其他数据。
* <p>
* 注册需要支持如下规则<br>
* 1. URL设置check = false参数时。注册失败,不会抛出异常而会在后台重试。否则将会抛出异常<br>
* 2. URL设置dynamic = false参数时,他需要被永久存储,否则当注册着异常退出,他应该被删除<br>
* 3. URL设置category=routers,这意味分类存储,默认类型为providers,数据将会被分类部分通知<br>
* 4. 当注册中心被重启了,比如网络抖动,数据不会丢失,包括自动从broken line处删除数据<br>
* 5. 允许具有相同URL但不同参数的URL共存,它们不能相互覆盖。<br>
*
* @param url 注册信息,不允许为空, e.g: dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**取消注册
*
* <p>
* 取消注册被要求支持如下规则<br>
* 1. 由于设置了dynamic = false存储的属于,当找不到注册信息数据,将会抛出异常,其他情况将会忽略<br>
* 2. 根据完整的网址匹配取消注册。<br>
*
* @param url 注册信息,不允许为空, e.g: dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 订阅合适的注册数据,当注册过的数据修改时,自动推送
* <p>
* 订阅需要遵循的规则<br>
* 1. URL设置check = false参数时。 注册失败时,不会在后台引发异常并重试该异常。<br>
* 2. 当URL设置了 category=routers,将会通知指定类型的数据。多个分类用逗号分隔,并允许星号匹配,这表示已订阅所有分类数据。<br>
* 3. 允许将接口,组,版本和分类器作为条件查询,例如:interface = com.alibaba.foo.BarService&version = 1.0.0<br>
* 4. 查询条件允许星号匹配,订阅所有接口的所有数据包的所有版本,例如 :interface = *&group = *&version = *&classifier = *<br>
* 5. 当注册表重新启动并且出现网络抖动时,有必要自动恢复订阅请求。<br>
* 6. 允许具有相同URL但不同参数的URL共存,它们不能相互覆盖。<br>
* 7. 当第一个通知完成并且返回后,订阅程序必须被阻塞<br>
*
* @param url 订阅条件,不允许为空, e.g. consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 事件变化的监听器,不允许为空
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消订阅
* <p>
* 取消订阅要遵循的规则<br>
* 1. 如果没有订阅,则直接忽略它。<br>
* 2. 取消订阅完整的URL匹配。<br>
*
* @param url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener A listener of the change event, not allowed to be empty
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* Query the registered data that matches the conditions. Corresponding to the push mode of the subscription, this is the pull mode and returns only one result.
*
* 查找匹配条件的注册过的数据.对于订阅的推送模式,这是请求模式将会返回一个结果
* @param url Query condition, is not allowed to be empty, e.g. consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @return 注册信息列表,可以为空,含义与参数相同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}.
* @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
*/
List<URL> lookup(URL url);
}
Registry:继承了Node和RegistryService的接口,实现该接口的类的应该是注册中心
AbstractRegistry:用来实现服务与订阅url的缓存文件的创建和生成。
- 成员变量
// URL address separator, used in file cache, service provider URL separation
//URL地址分隔符,用于文件缓存,服务提供商URL分隔
private static final char URL_SEPARATOR = ' ';
// URL address separated regular expression for parsing the service provider URL list in the file cache
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
//本地磁盘缓存,其中特殊键value.registies记录注册表中心列表,其他是已通知服务提供者的列表
private final Properties properties = new Properties();
// 文件缓存定时写入
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件
private final boolean syncSaveFile;
// 上次文件缓存变更版本
private final AtomicLong lastCacheChanged = new AtomicLong();
// 已注册服务URL集合
private final Set<URL> registered = new ConcurrentHashSet<URL>();
//已经订阅的<URL, Set<NotifyListener>>
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
//已经通知的<URL, Map<String, List<URL>>>
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
//zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&
// client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685×tamp=1507286468150
private URL registryUrl;
// 本地磁盘缓存文件
private File file;
- 构造函数
public AbstractRegistry(URL url) {
setUrl(url);
/**
* 获取URL对象的save.file属性默认为false代表不异步保存文件
*/
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
/**
* 获取URL对象的file属性,如果没有则dubbo帮我们指定默认的文件配置:像这样子
* C:\Users\Administrator\.dubbo\dubbo-registry-echo-provider-192.168.1.233:2181.cache
*/
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
/**
* 加载文件C:\Users\Administrator\.dubbo\dubbo-registry-echo-provider-192.168.1.233:2181.cache内容保存类似下面这样子的
* com.alibaba.dubbo.demo.DemoService=empty\://10.10.10.10\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category
* \=configurators&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=5259&side\=provider×tamp\=1507294508053
* 到成员变Properties properties = new Properties()
*/
loadProperties();
/**
* zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640×tamp=1572932516092
* 通过调用getBackUpUrls最终变成了
* zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640×tamp=1572932516092
* zookeeper://10.20.153.11:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640×tamp=1572932516092
* zookeeper://10.20.153.12:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640×tamp=1572932516092
*通知监听器,URL 变化结果
*/
notify(url.getBackupUrls());
}
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
//遍历URL对应的所有NotifyListener
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
//如果urls中的任意一个url与当subscribed的key对应的url匹配
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
//通知
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
FailbackRegistry:通过任务调度线程池用来做失败重试操作(包括:注册失败/取消注册失败/订阅失败/取消订阅失败/通知失败)的重试
- 成员变量
// 定时调度线程池,用于对注册失败、订阅失败的重试
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 重试失败计时器,定期检查是否有失败请求,是否有无限次重试,用于取消重试的调度任务
private final ScheduledFuture<?> retryFuture;
// 注册失败的Set<URL>
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// 取消注册失败的Set<URL>
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
//订阅失败的url
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
//取消订阅失败的url
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
//通知失败的url
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
- 构造函数:
public FailbackRegistry(URL url) {
//调用父类的构造函数
super(url);
//从URL对象中获取属性retry.period 如果没指定默认为5000毫秒
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
//使用retryExecutor定时调度retry()方法
//retry()方法主要是
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
/**
* 注册失败的重新注册url
* 取消注册失败的url重新取消注册
* 订阅失败的url重新订阅
* 取消订阅失败的url重新取消订阅
*/
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
// Retry the failed actions
protected void retry() {
if (!failedRegistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
for (URL url : failed) {
try {
doRegister(url);
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (!failedUnregistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedUnregistered);
if (!failed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Retry unregister " + failed);
}
try {
for (URL url : failed) {
try {
doUnregister(url);
failedUnregistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (!failedSubscribed.isEmpty()) {
Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry subscribe " + failed);
}
try {
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
doSubscribe(url, listener);
listeners.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (!failedUnsubscribed.isEmpty()) {
Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry unsubscribe " + failed);
}
try {
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
doUnsubscribe(url, listener);
listeners.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (!failedNotified.isEmpty()) {
Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry notify " + failed);
}
try {
for (Map<NotifyListener, List<URL>> values : failed.values()) {
for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
try {
NotifyListener listener = entry.getKey();
List<URL> urls = entry.getValue();
listener.notify(urls);
values.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
}
ZookeeperRegistry:Zookeeper实现dubbo的注册中心
- 成员变量
/**
* 默认zookeeper的根节点
*/
private final static String DEFAULT_ROOT = "dubbo";
/**
* zookeeper的根节点
*/
private final String root;
/**
* 服务接口集合
*/
private final Set<String> anyServices = new ConcurrentHashSet<String>();
/**
* 监听器
*/
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
/**
* 操作zookeeper的客户端实例
*/
private final ZookeeperClient zkClient;
- 构造函数
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//获取url中的group属性,不存在使用dubbo作为默认分组
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
//如果不以"/"开头,添加文件分隔符
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
//zookeeper的根节点为group
this.root = group;
//基于dubbo的spi机制会根据url中携带的参数去选择用哪个实现类。
//目前提供了ZkclientZookeeperTransporter这时候的zkClient为ZkclientZookeeperClient
//CuratorZookeeperTransporter这时候的zkClient为CuratorZookeeperClient
zkClient = zookeeperTransporter.connect(url);
//添加状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
//重连后,调动recover方法
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
调用父类构造函数,构造函数通过读取url中的group参数初始化zookeeper的根节点;通过zookeeperTransporter获取ZookeeperClient,ZookeeperTransporter为dubbo的spi接口,根据dubbo的spi实例化的过程这里的zookeeperTransporter类型为ZookeeperTransporter$Adaptive其代码如下
package com.alibaba.dubbo.remoting.zookeeper;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ZookeeperTransporter$Adaptive implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter {
public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("client", url.getParameter("transporter", "curator"));
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
return extension.connect(arg0);
}
}
所以不难得知dubbo默认情况下使用的zookeeper的客户端类型为CuratorZookeeperTransporter,代码如下
public class CuratorZookeeperTransporter implements ZookeeperTransporter {
@Override
public ZookeeperClient connect(URL url) {
return new CuratorZookeeperClient(url);
}
}
最后给zkClient添加StateListener;该StateListener监听到zk客户端的重连事件调用recover()方法,添加到failedRegistered和failedSubscribed通过FailbackRegistry定时任务重新注册、重新订阅,代码如下。
@Override
protected void recover() throws Exception {
// 已经注册的url添加到failedRegistered,通过FailbackRegistry的定时任务
//进行重新注册
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
failedRegistered.add(url);
}
}
// 已经订阅的URL添加到failedSubscribed,通过FailbackRegistry的定时任务
// 进行重新订阅
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
getRegisteredProviderUrl(originInvoker)
private URL getRegisteredProviderUrl(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
//The address you see at the registry
return providerUrl.removeParameters(getFilteredKeys(providerUrl))
.removeParameter(Constants.MONITOR_KEY)
.removeParameter(Constants.BIND_IP_KEY)
.removeParameter(Constants.BIND_PORT_KEY)
.removeParameter(QOS_ENABLE)
.removeParameter(QOS_PORT)
.removeParameter(ACCEPT_FOREIGN_IP)
.removeParameter(VALIDATION_KEY);
}
该方法返回的key类似
dubbo://169.254.22.149:20880/com.alibaba.dubbo.study.day01.xml.service.EchoService?addListener.1.callback=true&addListener.retries=2&anyhost=true&application=echo-provider&bean.name=com.alibaba.dubbo.study.day01.xml.service.EchoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.study.day01.xml.service.EchoService&methods=echo,addListener&pid=6688&side=provider×tamp=1572936570702
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
// group/interface:version
// 比如group2/com.alibaba.dubbo.study.day01.xml.service.EchoService:1.0.0
String serviceUniqueName = providerUrl.getServiceKey();
// 获取group/interface:version对应的ProviderInvokerWrapper列表
Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
//不存在是,缓存进去谢谢
if (invokers == null) {
providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
invokers = providerInvokers.get(serviceUniqueName);
}
invokers.add(wrapperInvoker);
}
将registryUrl,providerUrl,invoker封装成ProvidereInvokerWrapper对象,然后根据providerUrl生成一个服务的名称保存到内存中,也就是说我们可以通过ProviderConsumerRegTable类拿到已经注册过的服务的相关信息!!!
register(registryUrl, registeredProviderUrl);
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
获取注册中心,交给注册中心注册服务(由上文分析可知此时registry为ZookeeperRegistry)
FailbackRegistry.registry(URL url)
@Override
public void register(URL url) {
//父类方法实现将该url添加到registered集合中
super.register(url);
//从failedRegistered和failedUnregistered集合一处url
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 委托给子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 如果启动检测已打开,则直接引发Exception。如果check是true直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
//记录注册失败的url
failedRegistered.add(url);
}
}
ZookeeperRegistry.doRegistry(URL url)
@Override
protected void doRegister(URL url) {
try {
//创建URL节点
//通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
///${group}/${serviceInterface}/providers/${url}
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
委托给zkClient创建zk的目录
AbstractZookeeperClient.create(String path, boolean ephemeral)
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
//递归创建父节点
create(path.substring(0, i), false);
}
if (ephemeral) {
//创建临时节点
createEphemeral(path);
} else {
//创建持久节点
createPersistent(path);
}
}