欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Dubbo源码学习09

程序员文章站 2023-12-22 16:50:16
...

RegistryProtocol.export服务导出流程:导出服务ExporterChangeableWrapper->注册服务到注册中心->订阅注册中心overrideSubscribeUrl数据;篇幅有限,本篇幅主要分析注册服务到注册中心的实现

RegistryProtocol.export(final Invoker<T> invoker)

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        //导出服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
        // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 获取已注册的服务提供者 URL,比如:
        // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        //获取register参数;register表示是否注册到注册中心
        boolean register = registeredProviderUrl.getParameter("register", true);
        //缓存到ProviderConsumerRegTable的表中
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
        //注册服务到zookeeper
        if (register) {
            register(registryUrl, registeredProviderUrl);
            //找到该originInvoker对应的ProviderInvokerWrapper设置reg属性为true
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        //创建监听器
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        //放入overrideSubscribeUrl对应的OverrideListener
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        // 向注册中心进行订阅 override 数据
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        //创建并返回DestroyableExporter
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }

getRegistryUrl(originInvoler)

private URL getRegistryUrl(Invoker<?> originInvoker) {
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            //zookeeper
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryUrl;
    }

该方法获取注册中心url,假如使用zookeeper作为注册中心,得到的示例

zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider

getRegistry(originInvoker)

/**
     *
     * 根据调用者的地址获取注册表的实例
     *
     * @param originInvoker
     * @return
     */
    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }

RegistryFactory是dubbo的spi接口,由dubbo的spi机制可知,这里的registryFactory类型为RegistryFactory$Adaptvie代码如下

package com.alibaba.dubbo.registry;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
    public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
        return extension.getRegistry(arg0);
    }
}

通过debug得知registryFactory类型为ZookeeperRegistryFactoryDubbo源码学习09

AbstractRegistryFactory.getRegistry(URL url)

@Override
    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            //访问已经缓存的Registry
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //模板方法,委托给子类创建
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            //加入缓存
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

上述方法首先从缓存REGISTRIES中取,如果取出失败,通过子类覆盖createRegistry(url)创建相应类型的注册中心ZookeeperRegistry然后加入缓存中

ZookeeperRegistryFactory.createRegistry(URL url)

@Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

ZookeeperRegistry.java

Dubbo源码学习09

RegistryService:注册中心服务接口

public interface RegistryService {

    /**
     * 注册数据,例如:提供者服务,使用者地址,路由规则,覆盖规则和其他数据。
     * <p>
     * 注册需要支持如下规则<br>
     * 1. URL设置check = false参数时。注册失败,不会抛出异常而会在后台重试。否则将会抛出异常<br>
     * 2. URL设置dynamic = false参数时,他需要被永久存储,否则当注册着异常退出,他应该被删除<br>
     * 3. URL设置category=routers,这意味分类存储,默认类型为providers,数据将会被分类部分通知<br>
     * 4. 当注册中心被重启了,比如网络抖动,数据不会丢失,包括自动从broken line处删除数据<br>
     * 5. 允许具有相同URL但不同参数的URL共存,它们不能相互覆盖。<br>
     *
     * @param url  注册信息,不允许为空, e.g: dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     */
    void register(URL url);

    /**取消注册
     *
     * <p>
     * 取消注册被要求支持如下规则<br>
     * 1. 由于设置了dynamic = false存储的属于,当找不到注册信息数据,将会抛出异常,其他情况将会忽略<br>
     * 2. 根据完整的网址匹配取消注册。<br>
     *
     * @param url 注册信息,不允许为空, e.g: dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     */
    void unregister(URL url);

    /**
     * 订阅合适的注册数据,当注册过的数据修改时,自动推送
     * <p>
     * 订阅需要遵循的规则<br>
     * 1. URL设置check = false参数时。 注册失败时,不会在后台引发异常并重试该异常。<br>
     * 2. 当URL设置了 category=routers,将会通知指定类型的数据。多个分类用逗号分隔,并允许星号匹配,这表示已订阅所有分类数据。<br>
     * 3. 允许将接口,组,版本和分类器作为条件查询,例如:interface = com.alibaba.foo.BarService&version = 1.0.0<br>
     * 4. 查询条件允许星号匹配,订阅所有接口的所有数据包的所有版本,例如 :interface = *&group = *&version = *&classifier = *<br>
     * 5. 当注册表重新启动并且出现网络抖动时,有必要自动恢复订阅请求。<br>
     * 6. 允许具有相同URL但不同参数的URL共存,它们不能相互覆盖。<br>
     * 7. 当第一个通知完成并且返回后,订阅程序必须被阻塞<br>
     *
     * @param url      订阅条件,不允许为空, e.g. consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @param listener 事件变化的监听器,不允许为空
     */
    void subscribe(URL url, NotifyListener listener);

    /**
     * 取消订阅
     * <p>
     * 取消订阅要遵循的规则<br>
     * 1. 如果没有订阅,则直接忽略它。<br>
     * 2. 取消订阅完整的URL匹配。<br>
     *
     * @param url      Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @param listener A listener of the change event, not allowed to be empty
     */
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * Query the registered data that matches the conditions. Corresponding to the push mode of the subscription, this is the pull mode and returns only one result.
     *
     * 查找匹配条件的注册过的数据.对于订阅的推送模式,这是请求模式将会返回一个结果
     * @param url Query condition, is not allowed to be empty, e.g. consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @return  注册信息列表,可以为空,含义与参数相同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}.
     * @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
     */
    List<URL> lookup(URL url);

}

Registry:继承了Node和RegistryService的接口,实现该接口的类的应该是注册中心

AbstractRegistry:用来实现服务与订阅url的缓存文件的创建和生成。

  • 成员变量
// URL address separator, used in file cache, service provider URL separation
    //URL地址分隔符,用于文件缓存,服务提供商URL分隔
    private static final char URL_SEPARATOR = ' ';
    // URL address separated regular expression for parsing the service provider URL list in the file cache
    private static final String URL_SPLIT = "\\s+";
    // Log output
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    // Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
    //本地磁盘缓存,其中特殊键value.registies记录注册表中心列表,其他是已通知服务提供者的列表
    private final Properties properties = new Properties();
    // 文件缓存定时写入
    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
    // 是否同步保存文件
    private final boolean syncSaveFile;
    // 上次文件缓存变更版本
    private final AtomicLong lastCacheChanged = new AtomicLong();
    // 已注册服务URL集合
    private final Set<URL> registered = new ConcurrentHashSet<URL>();
    //已经订阅的<URL, Set<NotifyListener>>
    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    //已经通知的<URL, Map<String, List<URL>>>
    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&
    // client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
    private URL registryUrl;
    // 本地磁盘缓存文件
    private File file;
  • 构造函数
public AbstractRegistry(URL url) {
        setUrl(url);
        /**
         * 获取URL对象的save.file属性默认为false代表不异步保存文件
         */
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        /**
         * 获取URL对象的file属性,如果没有则dubbo帮我们指定默认的文件配置:像这样子
         * C:\Users\Administrator\.dubbo\dubbo-registry-echo-provider-192.168.1.233:2181.cache
         */
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        /**
         * 加载文件C:\Users\Administrator\.dubbo\dubbo-registry-echo-provider-192.168.1.233:2181.cache内容保存类似下面这样子的
         *  com.alibaba.dubbo.demo.DemoService=empty\://10.10.10.10\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category
         *  \=configurators&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=5259&side\=provider&timestamp\=1507294508053
         * 到成员变Properties properties = new Properties()
         */
        loadProperties();
       /**
         * zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640&timestamp=1572932516092
         * 通过调用getBackUpUrls最终变成了
         * zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640&timestamp=1572932516092
         * zookeeper://10.20.153.11:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640&timestamp=1572932516092
         * zookeeper://10.20.153.12:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&backup=10.20.153.11:2181,10.20.153.12:2181&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=8640&timestamp=1572932516092
         *通知监听器,URL 变化结果
         */
        notify(url.getBackupUrls());
    }

​
protected void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) return;
        //遍历URL对应的所有NotifyListener
        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();
            //如果urls中的任意一个url与当subscribed的key对应的url匹配
            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
            //通知
            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

​

FailbackRegistry:通过任务调度线程池用来做失败重试操作(包括:注册失败/取消注册失败/订阅失败/取消订阅失败/通知失败)的重试

  • 成员变量
 // 定时调度线程池,用于对注册失败、订阅失败的重试
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    // 重试失败计时器,定期检查是否有失败请求,是否有无限次重试,用于取消重试的调度任务
    private final ScheduledFuture<?> retryFuture;
    // 注册失败的Set<URL>
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    // 取消注册失败的Set<URL>
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
    //订阅失败的url
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    //取消订阅失败的url
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    //通知失败的url
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
  • 构造函数:
public FailbackRegistry(URL url) {
        //调用父类的构造函数
        super(url);
        //从URL对象中获取属性retry.period 如果没指定默认为5000毫秒
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        //使用retryExecutor定时调度retry()方法
        //retry()方法主要是
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    /**
                     * 注册失败的重新注册url
                     * 取消注册失败的url重新取消注册
                     * 订阅失败的url重新订阅
                     * 取消订阅失败的url重新取消订阅
                     */
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

// Retry the failed actions
    protected void retry() {
        if (!failedRegistered.isEmpty()) {
            Set<URL> failed = new HashSet<URL>(failedRegistered);
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry register " + failed);
                }
                try {
                    for (URL url : failed) {
                        try {
                            doRegister(url);
                            failedRegistered.remove(url);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        if (!failedUnregistered.isEmpty()) {
            Set<URL> failed = new HashSet<URL>(failedUnregistered);
            if (!failed.isEmpty()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry unregister " + failed);
                }
                try {
                    for (URL url : failed) {
                        try {
                            doUnregister(url);
                            failedUnregistered.remove(url);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        if (!failedSubscribed.isEmpty()) {
            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
                if (entry.getValue() == null || entry.getValue().size() == 0) {
                    failed.remove(entry.getKey());
                }
            }
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry subscribe " + failed);
                }
                try {
                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                        URL url = entry.getKey();
                        Set<NotifyListener> listeners = entry.getValue();
                        for (NotifyListener listener : listeners) {
                            try {
                                doSubscribe(url, listener);
                                listeners.remove(listener);
                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                                logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                            }
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        if (!failedUnsubscribed.isEmpty()) {
            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
                if (entry.getValue() == null || entry.getValue().isEmpty()) {
                    failed.remove(entry.getKey());
                }
            }
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry unsubscribe " + failed);
                }
                try {
                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                        URL url = entry.getKey();
                        Set<NotifyListener> listeners = entry.getValue();
                        for (NotifyListener listener : listeners) {
                            try {
                                doUnsubscribe(url, listener);
                                listeners.remove(listener);
                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                                logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                            }
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        if (!failedNotified.isEmpty()) {
            Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
            for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
                if (entry.getValue() == null || entry.getValue().size() == 0) {
                    failed.remove(entry.getKey());
                }
            }
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry notify " + failed);
                }
                try {
                    for (Map<NotifyListener, List<URL>> values : failed.values()) {
                        for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
                            try {
                                NotifyListener listener = entry.getKey();
                                List<URL> urls = entry.getValue();
                                listener.notify(urls);
                                values.remove(listener);
                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                                logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                            }
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
    }

ZookeeperRegistry:Zookeeper实现dubbo的注册中心

  • 成员变量
 /**
     * 默认zookeeper的根节点
     */
    private final static String DEFAULT_ROOT = "dubbo";
    /**
     * zookeeper的根节点
     */
    private final String root;
    /**
     * 服务接口集合
     */
    private final Set<String> anyServices = new ConcurrentHashSet<String>();
    /**
     * 监听器
     */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
    /**
     * 操作zookeeper的客户端实例
     */
    private final ZookeeperClient zkClient;
  • 构造函数
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //获取url中的group属性,不存在使用dubbo作为默认分组
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        //如果不以"/"开头,添加文件分隔符
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        //zookeeper的根节点为group
        this.root = group;
        //基于dubbo的spi机制会根据url中携带的参数去选择用哪个实现类。
        //目前提供了ZkclientZookeeperTransporter这时候的zkClient为ZkclientZookeeperClient
        //CuratorZookeeperTransporter这时候的zkClient为CuratorZookeeperClient
        zkClient = zookeeperTransporter.connect(url);
        //添加状态监听器
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                //重连后,调动recover方法
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

调用父类构造函数,构造函数通过读取url中的group参数初始化zookeeper的根节点;通过zookeeperTransporter获取ZookeeperClient,ZookeeperTransporter为dubbo的spi接口,根据dubbo的spi实例化的过程这里的zookeeperTransporter类型为ZookeeperTransporter$Adaptive其代码如下

package com.alibaba.dubbo.remoting.zookeeper;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class ZookeeperTransporter$Adaptive implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter {
    public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "curator"));
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
        return extension.connect(arg0);
    }
}

所以不难得知dubbo默认情况下使用的zookeeper的客户端类型为CuratorZookeeperTransporter,代码如下

public class CuratorZookeeperTransporter implements ZookeeperTransporter {

    @Override
    public ZookeeperClient connect(URL url) {
        return new CuratorZookeeperClient(url);
    }

}

最后给zkClient添加StateListener;该StateListener监听到zk客户端的重连事件调用recover()方法,添加到failedRegistered和failedSubscribed通过FailbackRegistry定时任务重新注册、重新订阅,代码如下。

@Override
    protected void recover() throws Exception {
        // 已经注册的url添加到failedRegistered,通过FailbackRegistry的定时任务
        //进行重新注册
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                failedRegistered.add(url);
            }
        }
        // 已经订阅的URL添加到failedSubscribed,通过FailbackRegistry的定时任务
        // 进行重新订阅
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }

getRegisteredProviderUrl(originInvoker)

private URL getRegisteredProviderUrl(final Invoker<?> originInvoker) {
        URL providerUrl = getProviderUrl(originInvoker);
        //The address you see at the registry
        return providerUrl.removeParameters(getFilteredKeys(providerUrl))
                .removeParameter(Constants.MONITOR_KEY)
                .removeParameter(Constants.BIND_IP_KEY)
                .removeParameter(Constants.BIND_PORT_KEY)
                .removeParameter(QOS_ENABLE)
                .removeParameter(QOS_PORT)
                .removeParameter(ACCEPT_FOREIGN_IP)
                .removeParameter(VALIDATION_KEY);
    }

该方法返回的key类似

dubbo://169.254.22.149:20880/com.alibaba.dubbo.study.day01.xml.service.EchoService?addListener.1.callback=true&addListener.retries=2&anyhost=true&application=echo-provider&bean.name=com.alibaba.dubbo.study.day01.xml.service.EchoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.study.day01.xml.service.EchoService&methods=echo,addListener&pid=6688&side=provider&timestamp=1572936570702

ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
        ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
        // group/interface:version
        // 比如group2/com.alibaba.dubbo.study.day01.xml.service.EchoService:1.0.0
        String serviceUniqueName = providerUrl.getServiceKey();
        // 获取group/interface:version对应的ProviderInvokerWrapper列表
        Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
        //不存在是,缓存进去谢谢
        if (invokers == null) {
            providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
            invokers = providerInvokers.get(serviceUniqueName);
        }

        invokers.add(wrapperInvoker);
    }

将registryUrl,providerUrl,invoker封装成ProvidereInvokerWrapper对象,然后根据providerUrl生成一个服务的名称保存到内存中,也就是说我们可以通过ProviderConsumerRegTable类拿到已经注册过的服务的相关信息!!!

register(registryUrl, registeredProviderUrl);

public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

获取注册中心,交给注册中心注册服务(由上文分析可知此时registry为ZookeeperRegistry)

FailbackRegistry.registry(URL url)

@Override
    public void register(URL url) {
        //父类方法实现将该url添加到registered集合中
        super.register(url);
        //从failedRegistered和failedUnregistered集合一处url
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            // 委托给子类实现
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 如果启动检测已打开,则直接引发Exception。如果check是true直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            //记录注册失败的url
            failedRegistered.add(url);
        }
    }

ZookeeperRegistry.doRegistry(URL url)

@Override
    protected void doRegister(URL url) {
        try {
            //创建URL节点
            //通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
            ///${group}/${serviceInterface}/providers/${url}
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

委托给zkClient创建zk的目录

AbstractZookeeperClient.create(String path, boolean ephemeral)

@Override
    public void create(String path, boolean ephemeral) {
        if (!ephemeral) {
            // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
            if (checkExists(path)) {
                return;
            }
        }
        int i = path.lastIndexOf('/');
        if (i > 0) {
            //递归创建父节点
            create(path.substring(0, i), false);
        }
        if (ephemeral) {
            //创建临时节点
            createEphemeral(path);
        } else {
            //创建持久节点
            createPersistent(path);
        }
    }

 

相关标签: 注册服务

上一篇:

下一篇: