欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

dubbo源码阅读之服务目录

程序员文章站 2022-03-30 09:38:57
服务目录 服务目录对应的接口是Directory,这个接口里主要的方法是 List list(Invocation invocation) throws RpcException; 列出所有的Invoker,对于服务消费端而言,一个Invoker对应一个可用的服务提供者,底层封装了一个tcp连接。当 ......

服务目录

服务目录对应的接口是directory,这个接口里主要的方法是

list<invoker<t>> list(invocation invocation) throws rpcexception;

列出所有的invoker,对于服务消费端而言,一个invoker对应一个可用的服务提供者,底层封装了一个tcp连接。当然invoker也可以是嵌套的,一个invoker内包含了多个实际的invoker。通过cluster对象将一个服务目录封装成一个invoker,内部包含了故障转移,服务路由,负载均衡,等等相关的集群逻辑。
回到服务目录,主要包括两种服务目录,staticdirectory,registrydirectory。

  • staticdirectory。静态服务目录,顾名思义,这个目录在创建的时候就会通过构造方法传进一个invoker列表,在之后过程中这个列表不再变化。
  • registrydirectory。通过监听注册中心的服务提供者信息动态更新invoker列表的服务目录。

从上节服务引入,我们知道,不论是staticdirectory还是registrydirectory,最终都会通过cluster.join方法封装为一个invoker。由于静态服务目录的逻辑很简单,这里不再赘述,本节我们主要分析一下注册中心的服务目录。

registrydirectory概述

这个类除了继承了abstractdirectory,还实现了notifylistener接口。notifylistener接口是一个监听类,用于监听注册中心配置信息的变更事件。我们首先简单看一下registrydirectory中实现directory接口的部分代码。

abstractdirectory.list

list方法的实现放在抽象类abstractdirectory中,

public list<invoker<t>> list(invocation invocation) throws rpcexception {
    if (destroyed) {
        throw new rpcexception("directory already destroyed .url: " + geturl());
    }

    return dolist(invocation);
}

wishing就是一个状态的判断。dolist是一个模板方法,由子类实现。

registrydirectory.dolist

@override
public list<invoker<t>> dolist(invocation invocation) {
    // 当状态量forbidden为true时,服务调用被禁止
    // 什么时候forbidden为true呢??当url只有一个,且协议名称为empty时,就以为这没有服务提供者可用。
    if (forbidden) {
        // 1. no service provider 2. service providers are disabled
        throw new rpcexception(rpcexception.forbidden_exception, "no provider available from registry " +
                geturl().getaddress() + " for service " + getconsumerurl().getservicekey() + " on consumer " +
                netutils.getlocalhost() + " use dubbo version " + version.getversion() +
                ", please check status of providers(disabled, not registered or in blacklist).");
    }

    // 服务分组
    if (multigroup) {
        return this.invokers == null ? collections.emptylist() : this.invokers;
    }

    list<invoker<t>> invokers = null;
    try {
        // get invokers from cache, only runtime routers will be executed.
        // 从缓存中取出invoker列表,并经由服务路由获取相应的invoker
        invokers = routerchain.route(getconsumerurl(), invocation);
    } catch (throwable t) {
        logger.error("failed to execute router: " + geturl() + ", cause: " + t.getmessage(), t);
    }


    // fixme is there any need of failing back to constants.any_value or the first available method invokers when invokers is null?
    /*map<string, list<invoker<t>>> localmethodinvokermap = this.methodinvokermap; // local reference
    if (localmethodinvokermap != null && localmethodinvokermap.size() > 0) {
        string methodname = rpcutils.getmethodname(invocation);
        invokers = localmethodinvokermap.get(methodname);
        if (invokers == null) {
            invokers = localmethodinvokermap.get(constants.any_value);
        }
        if (invokers == null) {
            iterator<list<invoker<t>>> iterator = localmethodinvokermap.values().iterator();
            if (iterator.hasnext()) {
                invokers = iterator.next();
            }
        }
    }*/
    return invokers == null ? collections.emptylist() : invokers;
}

这个方法的主要逻辑是,首先判断服务是否可用(根据forbidden状态变量)。然后从路由链中取出invoker列表。由于服务路由并不是本节的重点,所以我们只是简单第看一下routerchain.route方法

routerchain.route

public list<invoker<t>> route(url url, invocation invocation) {
    list<invoker<t>> finalinvokers = invokers;
    for (router router : routers) {
        finalinvokers = router.route(finalinvokers, url, invocation);
    }
    return finalinvokers;
}

一次调用路由列表中的路由规则,最终返回经过多个路由规则路由过的invoker列表。类似于责任链模式,有点像web容器的过滤器,或者是spring-mvc中的拦截器,都是一个链式的调用。
实际上我们平时一般较少使用到路由功能,所以这里routers列表实际上是空的,这种情况下不用经过任何路由,直接原样返回invokers列表。而至于routerchain内部的invokers成员是哪来的,registrydirectory监听注册中心发生变更后刷新本地缓存中的invokers列表,并将其注入到routerchain对象中,我们后面会讲到。

registrydirectory.notify

接下来我们分析registrydirectory中最重要的方法,也就是监听方法,用于监听注册中心的变更事件。

public synchronized void notify(list<url> urls) {
    // 将监听到的url分类,
    // 按照协议名称或者category参数分为configurators,routers,providers三类
    map<string, list<url>> categoryurls = urls.stream()
            .filter(objects::nonnull)
            .filter(this::isvalidcategory)
            .filter(this::isnotcompatiblefor26x)
            .collect(collectors.groupingby(url -> {
                if (urlutils.isconfigurator(url)) {
                    return configurators_category;
                } else if (urlutils.isroute(url)) {
                    return routers_category;
                } else if (urlutils.isprovider(url)) {
                    return providers_category;
                }
                return "";
            }));

    // 如果有变化的configurators类别的url,那么将其转化为参数并设到成员变量configurators
    list<url> configuratorurls = categoryurls.getordefault(configurators_category, collections.emptylist());
    this.configurators = configurator.toconfigurators(configuratorurls).orelse(this.configurators);

    // 如果有变更的路由信息url,那么将其转化为router对象并覆盖原先的路由信息
    list<url> routerurls = categoryurls.getordefault(routers_category, collections.emptylist());
    torouters(routerurls).ifpresent(this::addrouters);

    // providers
    // 最后处理最重要的服务提供者变更信息,并用这些url刷新当前缓存的invoker
    list<url> providerurls = categoryurls.getordefault(providers_category, collections.emptylist());
    refreshoverrideandinvoker(providerurls);
}

首先将从注册中心获取到的最新的url进行分类,根据协议名称或者category参数将url分为三类:configurators, routers, providers,

  • configurators类型的url被转换为configurator列表,覆盖本地缓存
  • routers类型的url被转换为router列表,并被设置到routerchain对象中
  • providers类型的url则被用于接下来的创建invoker

registrydirectory.refreshoverrideandinvoker

private void refreshoverrideandinvoker(list<url> urls) {
    // mock zookeeper://xxx?mock=return null
    // 用变更的配置信息覆盖overridedirectoryurl成员变量
    overridedirectoryurl();
    // 刷新缓存中的invokers
    refreshinvoker(urls);
}

overridedirectoryurl方法的作用主要是用从注册中心以及配置中心监听到的变更的配置覆盖本地的overridedirectoryurl成员变量中的配置。我们接着往下走。

registrydirectory.refreshinvoker

// 入参invokerurls是从注册中心拉取的服务提供者url
private void refreshinvoker(list<url> invokerurls) {
    assert.notnull(invokerurls, "invokerurls should not be null");

    // 如果只有一个服务提供者,并且协议名称是empty,说明无提供者可用
    // 将状态forbidden设为true, invokers设为空列表
    if (invokerurls.size() == 1
            && invokerurls.get(0) != null
            && constants.empty_protocol.equals(invokerurls.get(0).getprotocol())) {
        this.forbidden = true; // forbid to access
        this.invokers = collections.emptylist();
        routerchain.setinvokers(this.invokers);
        destroyallinvokers(); // close all invokers
    } else {
        this.forbidden = false; // allow to access
        // 记下旧的invoker列表
        map<string, invoker<t>> oldurlinvokermap = this.urlinvokermap; // local reference
        if (invokerurls == collections.<url>emptylist()) {
            invokerurls = new arraylist<>();
        }
        // 如果从注册中心没有拉取到服务提供者信息,那么使用之前缓存的服务提供者信息
        // 这就是为什么dubbo在注册中心挂了之后消费者仍然能够调用提供者,因为消费者在本地进行了缓存
        if (invokerurls.isempty() && this.cachedinvokerurls != null) {
            invokerurls.addall(this.cachedinvokerurls);
        } else {
            this.cachedinvokerurls = new hashset<>();
            this.cachedinvokerurls.addall(invokerurls);//cached invoker urls, convenient for comparison
        }
        // 如果注册中心没有提供者信息,并且本地也没有缓存,那么就没法进行服务调用了
        if (invokerurls.isempty()) {
            return;
        }
        // 将服务提供者url转化为invoker对象存放到map中
        map<string, invoker<t>> newurlinvokermap = toinvokers(invokerurls);// translate url list to invoker map

        /**
         * if the calculation is wrong, it is not processed.
         *
         * 1. the protocol configured by the client is inconsistent with the protocol of the server.
         *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
         * 2. the registration center is not robust and pushes illegal specification data.
         *
         */
        if (collectionutils.isemptymap(newurlinvokermap)) {
            logger.error(new illegalstateexception("urls to invokers error .invokerurls.size :" + invokerurls.size() + ", invoker.size :0. urls :" + invokerurls
                    .tostring()));
            return;
        }

        list<invoker<t>> newinvokers = collections.unmodifiablelist(new arraylist<>(newurlinvokermap.values()));
        // pre-route and build cache, notice that route cache should build on original invoker list.
        // tomergemethodinvokermap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        // 将生成的invoker列表设置到routerchain的缓存中,
        // routerchain将对这些invoker进行路由
        routerchain.setinvokers(newinvokers);
        // 处理服务分组的情况
        this.invokers = multigroup ? tomergeinvokerlist(newinvokers) : newinvokers;
        // 将缓存的invoker设置为新生成的
        this.urlinvokermap = newurlinvokermap;

        try {
            // 这里实际上求新的invoker列表和旧的差集,将不再使用的旧的invoker销毁
            destroyunusedinvokers(oldurlinvokermap, newurlinvokermap); // close the unused invoker
        } catch (exception e) {
            logger.warn("destroyunusedinvokers error. ", e);
        }
    }
}
  • 这个方法首先根据监听到的提供者url列表判断是否处于服务禁用状态,判断依据是:如果只有一个url,并且该url协议名称是empty,说明无提供者可用,将forbidden变量设为true,即禁止服务调用,
    并做一下其他的相关设置以及销毁缓存中的invoker。

  • 如果不是禁止状态,继续往下走。如果从注册中心获取到的url列表为空,那么检查本地缓存的url列表是否为空,如果缓存不为空就用缓存的列表。如果本地缓存也为空,说明无服务可用,直接返回。
  • 如果如果从注册中心获取到的url列表不为空,说明有服务可用,这时就不会再去尝试本地缓存了(因为缓存已经过期了),并且将本地缓存更新为新获取的url列表。
  • 将可用的提供者url列表转化为invoker列表。
  • 将新创建的invoker列表设置到routerchain中,这里呼应了前文提到的在dolist方法中,从routerchain对象中取出缓存的invoker列表。
  • 将本地缓存的url->invoker map更新为新创建的。
  • 最后销毁缓存中不再使用的invoker

registrydirectory.toinvokers

/**
 * turn urls into invokers, and if url has been refer, will not re-reference.
 *
 * @param urls 从注册中心拉取的服务提供者信息
 * @return invokers
 */
private map<string, invoker<t>> toinvokers(list<url> urls) {
    map<string, invoker<t>> newurlinvokermap = new hashmap<>();
    if (urls == null || urls.isempty()) {
        return newurlinvokermap;
    }
    // 用于防止对相同的url重复创建invoker
    set<string> keys = new hashset<>();
    string queryprotocols = this.querymap.get(constants.protocol_key);
    for (url providerurl : urls) {
        // if protocol is configured at the reference side, only the matching protocol is selected
        // 如果消费端配置了协议名称,那么只有符合条件的提供者url才会被使用
        // 这段代码有待商榷 ,应该先把queryprotocols处理好,避免重复做同样的工作
        if (queryprotocols != null && queryprotocols.length() > 0) {
            boolean accept = false;
            string[] acceptprotocols = queryprotocols.split(",");
            for (string acceptprotocol : acceptprotocols) {
                if (providerurl.getprotocol().equals(acceptprotocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                continue;
            }
        }
        // 如果协议名称是empty,那么忽略该条url
        if (constants.empty_protocol.equals(providerurl.getprotocol())) {
            continue;
        }
        // 如果当前classpath下找不到与提供者url中协议名称相对应的protocol类,那么打印错误日志同时忽略该条url
        if (!extensionloader.getextensionloader(protocol.class).hasextension(providerurl.getprotocol())) {
            logger.error(new illegalstateexception("unsupported protocol " + providerurl.getprotocol() +
                    " in notified url: " + providerurl + " from registry " + geturl().getaddress() +
                    " to consumer " + netutils.getlocalhost() + ", supported protocol: " +
                    extensionloader.getextensionloader(protocol.class).getsupportedextensions()));
            continue;
        }
        // 合并消费端设置的参数以及从注册中心,配置中心监听到的配置变更
        url url = mergeurl(providerurl);

        // 以全路径作为该url的唯一标识
        string key = url.tofullstring(); // the parameter urls are sorted
        if (keys.contains(key)) { // repeated url
            continue;
        }
        keys.add(key);
        // cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
        map<string, invoker<t>> localurlinvokermap = this.urlinvokermap; // local reference
        // 如果之前已经创建过该url的invoker对象,那么就不用再重复创建
        invoker<t> invoker = localurlinvokermap == null ? null : localurlinvokermap.get(key);
        if (invoker == null) { // not in the cache, refer again
            try {
                boolean enabled = true;
                // 检查disabled和enabled参数的值
                if (url.hasparameter(constants.disabled_key)) {
                    enabled = !url.getparameter(constants.disabled_key, false);
                } else {
                    enabled = url.getparameter(constants.enabled_key, true);
                }
                if (enabled) {
                    // 真正创建invoker的地方,
                    // invokerdelegate只是个简单的包装类,不需要多说
                    invoker = new invokerdelegate<>(protocol.refer(servicetype, url), url, providerurl);
                }
            } catch (throwable t) {
                logger.error("failed to refer invoker for interface:" + servicetype + ",url:(" + url + ")" + t.getmessage(), t);
            }
            if (invoker != null) { // put new invoker in cache
                newurlinvokermap.put(key, invoker);
            }
        } else {
            newurlinvokermap.put(key, invoker);
        }
    }
    keys.clear();
    return newurlinvokermap;
}
  • 首先根据协议名称检查url是否可用。url的协议必须在本地配置的协议列表中(如果没有配置就不需要做此检查);如果协议名称是empty则忽略这个url;如果当前classpath下找不到与提供者url中协议名称相对应的protocol类,那么打印错误日志同时忽略该条url
  • 合并消费端设置的参数以及从注册中心,配置中心监听到的配置变更
  • 检查disabled,enabled参数的值,判断该url是否启用,如果disabled为true则跳过该url;如果没有disabled参数,检查enabled参数,如果enabled为false则跳过该url,enabled默认是true。
  • 调用protocol.refer方法创建invoker对象。

这里需要说明一下,由于directory不是通过spi机制加载的,所以registrydirectory也不是通过extensionloader加载的,所以也就不会受到extensionloader的ioc影响。registrydirectory内部的protocol成员是在registrydirectory初始化之后通过调用setter方法设置进去的,是在registryprotocol.dorefer方法中完成的。而registryprotocol是通过extensionloader机制加载的,会受到ioc影响,所以registryprotocol实例内部的protocol成员是通过extensionloader的ioc机制自动注入的,是一个自适应的扩展类。

另外,invokerdelegate只是个简单的包装类,不需要多说。
invoker的创建最终还是通过protocol.refer方法,我们以最常用的dubbo协议为例进行分析。

dubboprotocol.refer

@override
public <t> invoker<t> refer(class<t> servicetype, url url) throws rpcexception {
    optimizeserialization(url);

    // create rpc invoker.
    dubboinvoker<t> invoker = new dubboinvoker<t>(servicetype, url, getclients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

这个方法很简单,直接new了一个dubboinvoker。

dubboinvoker

看一下doinvoke方法,这个方法主要是处理了同步,异步,超时,单向调用等参数,并且对调用结果封装了异步调用,同步调用的逻辑。
真正执行远程调用的部分是靠exchangeclient实现的,再往下就是调用参数的序列化,tcp连接创建,发送报文,获取响应报文,反序列化结果等的逻辑了,本文不再深入下去。