欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

3、Dubbo的Directory目录与Router路由服务

程序员文章站 2024-03-12 08:51:38
...

 

当我们聊到Dubbo的Directory服务目录&Router路由服务时,我们应该考虑什么?

  • 服务目录主要用来管理服务提供者列表,它实现了哪些管理功能?
  • 消费端如何使用服务目录中的服务列表?进行服务调用?

         在服务消费端应用中,每个需要消费的服务都被包装为ReferenceConfig,在应用启动时会调用每个服务对应的ReferenceConfig的get()方法,然后为每个服务创建一个自己的RegistryDirectory对象,每个RegistryDirectory管理该服务提供者的Invoker的地址列表、路由规则、动态配置等信息当服务提供者的信息发生变化时,RegistryDirectory会动态的得到变化通知并自动更新;

Dubbo的Directory目录

       Directory的实现有RegistryDirectory和StaticDirectory两种,前者管理的invoker列表是根据服务注册中心的推送变化而变化的,而后者是当消费端使用端注册中心的时候,会把所有服务注册中心的invoker列表汇集到一个invoker列表中;

我们主要看RegistryDirectory:

  • 消费端何时创建RegistryDirectory
  • RegistryDirectory管理的invoker列表如何动态变化
  • 路由信息如何保存&变化

1、RegistryDirectory创建

(1)、ReferenceConfig开始

RegistryDirectory是在服务消费端启动的时候创建的。ReferenceConfig代表了一个要消费的服务的配置对象,首先会调用ReferenceConfig的get()方法,意味着要创建一个对服务提供方远程调用的代理。

public synchronized T get() {
        // 已销毁,不可获得
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        // 初始化
        if (ref == null) {
            init();
        }
        return ref;
}

init()中会创建服务提供端调用的代理对象;

 private void init() {
    ....
     // 创建 Service 代理对象
    ref = createProxy(map);
    ....
}

创建代理的时候,首先会调用RegistryProtocol类的refer方法,这里Protocol是一个SPI扩展点RegistryProtocol是它的一个自适应的扩展实现,真正发起调用是用其适配器类Protocol$Adaptive进行间接调用的。另外ProtocolListenerWrapper、QosProtocolWrapper和ProtocolFilterWrapper是对RegistryProtocol类的功能的增强;

 /**
  * 创建 Service 代理对象
  *
  * @param map 集合
  * @return 代理对象
  */
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
    ....
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
    ....
    // 创建 Service 代理对象
    return (T) proxyFactory.getProxy(invoker);
}

(1)、RegistryProtocol进入refer

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ...       
        // 获得注册中心
        Registry registry = registryFactory.getRegistry(url);
        ....
        // 执行服务引用
        return doRefer(cluster, registry, type, url);
}
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 对象,并设置注册中心
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 创建订阅 URL
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 服务引用配置集合
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注册中心注册自己(服务消费者)
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false))); // 不检查的原因是,不需要检查。
        }
        // 向注册中心订阅服务提供者 + 路由规则 + 配置规则
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                        Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 创建 Invoker 对象
        Invoker invoker = cluster.join(directory);
        // 向本地注册表,注册消费者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

我们可看到在RegistryProtocol的refer()方法的dorefer()中创建了RegistryDirectory目录;

1、RegistryDirectory中invoker列表的更新

(1)、ZookeeperRegistry的doSubscribe

创建完RegistryDirectory后,调用了器subscribe()方法,这里假设使用的注册中心是zookeeper,这样就会去ZookeeperRegistry去订阅需要调用的服务提供者的地址列表;订阅方法中会添加一个监听器,当zookeeper发现服务提供者地址列表发生变化时,zkClient会回调这个监听器的notify()方法;

@Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 处理所有 Service 层的发起订阅,例如监控中心的订阅
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                // 获得 url 对应的监听器集合
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) { // 不存在,进行创建
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                // 获得 ChildListener 对象
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                // 新增 Service 接口全名时(即新增服务),发起该 Service 层的订阅
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 创建 Service 节点。该节点为持久节点。
                zkClient.create(root, false);
                // 向 Zookeeper ,Service 节点,发起订阅
                List<String> services = zkClient.addChildListener(root, zkListener);
                // 首次全量数据获取完成时,循环 Service 接口全名数组,发起该 Service 层的订阅
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            // 处理指定 Service 层的发起订阅,例如服务消费者的订阅
            } else {
                // 子节点数据数组
                List<URL> urls = new ArrayList<URL>();
                // 循环分类数组
                for (String path : toCategoriesPath(url)) {
                    // 获得 url 对应的监听器集合
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) { // 不存在,进行创建
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    // 获得 ChildListener 对象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    // 创建 Type 节点。该节点为持久节点。
                    zkClient.create(path, false);
                    // 向 Zookeeper ,PATH 节点,发起订阅
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // 添加到 `urls` 中
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

(2)、RegistryDerictory的notify()通知

订阅zk并设置监听器后,同步返回了订阅的服务地址列表、路由规则、配置信息等,然后同步调用了RegistryDerictory的notify()方法返回,返回的服务提供者信息被分为三组(地址列表、路由规则、服务降级等配置信息),并通过refreshInvoker()写入到RoutChain里,当服务消费方的集群容错策略要获取可用服务提供者对应的invoker列表时,会调用RouterChain的route()方法,其内部根据路由规则信息和invoker列表提供服务

 @Override
 public synchronized void notify(List<URL> urls) {
        // 根据 URL 的分类或协议,分组成三个集合 。
        List<URL> invokerUrls = new ArrayList<URL>(); // 服务提供者 URL 集合
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // 处理配置规则 URL 集合
        // configurators
        if (!configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // 处理路由规则 URL 集合
        // routers
        if (!routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        // 合并配置规则,到 `directoryUrl` 中,形成 `overrideDirectoryUrl` 变量。
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // 处理服务提供者 URL 集合
        // providers
        refreshInvoker(invokerUrls);
    }

这里省略refreshInvoker内容了;