欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

dubbo源码阅读之集群(故障处理策略)

程序员文章站 2022-07-10 23:50:06
dubbo集群概述 dubbo集群功能的切入点在ReferenceConfig.createProxy方法以及Protocol.refer方法中。 在ReferenceConfig.createProxy方法中,如果用户指定多个提供者url或注册中心url,那么会创建多个Invoker,然后用Sta ......

dubbo集群概述

dubbo集群功能的切入点在referenceconfig.createproxy方法以及protocol.refer方法中。
在referenceconfig.createproxy方法中,如果用户指定多个提供者url或注册中心url,那么会创建多个invoker,然后用staticdirectory将这多个invoker封装在一起,然后用相应的cluster实现类将这个静态的服务目录包装成一个invoker,每种集群类都对应一种invoker的集群包装类,例如,failoverclusterinvoker,failbackclusterinvoker,failfastclusterinvoker,failsafeclusterinvoker,forkingclusterinvoker等等,而这些封装集群逻辑的invoker包装类都继承自abstractclusterinvoker抽象类。这个抽象类里主要实现了调用时的状态检查,invocation类参数设置,负载均衡,服务提供者可用性检测等逻辑,而服务调用失败后的行为逻辑则交由子类实现。

abstractclusterinvoker.invoke

首先我们从这个方法看起,这个方法是invoker类的调用入口,

@override
// 这个方法的主要作用是为调用做一些前置工作,
// 包括检查状态,设置参数,从服务目录取出invoker列表,根据<方法名>.loadbalance参数值获取相应的负载均衡器
// 最后调用模板方法
public result invoke(final invocation invocation) throws rpcexception {
    // 检查该invoker是否已经被销毁
    // 在监听到注册中心变更刷新invoker列表时可能会销毁不再可用的invoker
    checkwhetherdestroyed();

    // binding attachments into invocation.
    // 将rpccontext中的参数绑定到invocation上
    // 用户可以通过rpccontext向每次调用传递不同的参数
    map<string, string> contextattachments = rpccontext.getcontext().getattachments();
    if (contextattachments != null && contextattachments.size() != 0) {
        ((rpcinvocation) invocation).addattachments(contextattachments);
    }

    // 列出所有的服务提供者
    // 这个方法直接调用服务目录的list方法
    list<invoker<t>> invokers = list(invocation);
    // 根据url中的loadbalance参数值获取相应的负载均衡器,默认是随机负载均衡randomloadbalance
    loadbalance loadbalance = initloadbalance(invokers, invocation);
    // 添加调用id,唯一标识本次调用
    rpcutils.attachinvocationidifasync(geturl(), invocation);
    // 模板方法,子类实现
    return doinvoke(invocation, invokers, loadbalance);
}

failoverclusterinvoker.doinvoke

我们以默认的集群类failoverclusterinvoker为例,分析一下这个类的doinvoke方法

// 这个方法主要实现了重试的逻辑,这也正是这个类的特性,故障转移功能
public result doinvoke(invocation invocation, final list<invoker<t>> invokers, loadbalance loadbalance) throws rpcexception {
    // 拷贝一份本地引用,invokers可能会变
    list<invoker<t>> copyinvokers = invokers;
    // 检查提供者列表是否为空
    checkinvokers(copyinvokers, invocation);
    string methodname = rpcutils.getmethodname(invocation);
    // 获取调用的方法的retries参数值,重试次数等于该值+1,因为第一次调用不算重试
    int len = geturl().getmethodparameter(methodname, constants.retries_key, constants.default_retries) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    // 循环重试
    // 记录最后一次出现的异常
    rpcexception le = null; // last exception.
    // 记录调用失败的提供者
    list<invoker<t>> invoked = new arraylist<invoker<t>>(copyinvokers.size()); // invoked invokers.
    // 记录调用过的提供者的地址,
    set<string> providers = new hashset<string>(len);
    for (int i = 0; i < len; i++) {
        //reselect before retry to avoid a change of candidate `invokers`.
        //note: if `invokers` changed, then `invoked` also lose accuracy.
        // 每次循环都要重新检查状态,重新列出可用的提供者invoker,并检查可用的invoker是否为空
        // 因为这些状态或提供者信息随时都可能发生变化
        if (i > 0) {
            checkwhetherdestroyed();
            copyinvokers = list(invocation);
            // check again
            checkinvokers(copyinvokers, invocation);
        }
        // 从可用的invoker列表总选择一个
        // 选择逻辑中考虑了“粘滞”调用和负载均衡的逻辑
        invoker<t> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 添加到已经调用的列表中
        invoked.add(invoker);
        rpccontext.getcontext().setinvokers((list) invoked);
        try {
            result result = invoker.invoke(invocation);
            if (le != null && logger.iswarnenabled()) {
                logger.warn("although retry the method " + methodname
                        + " in the service " + getinterface().getname()
                        + " was successful by the provider " + invoker.geturl().getaddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.geturl().getaddress()
                        + " on the consumer " + netutils.getlocalhost()
                        + " using the dubbo version " + version.getversion() + ". last error is: "
                        + le.getmessage(), le);
            }
            return result;
        } catch (rpcexception e) {
            // 对于业务异常直接抛出,这个异常会穿透dubbo框架直接抛给用户
            // 非业务异常例如网络问题,连接断开,提供者下线等可以通过故障转移,重试机制解决,
            // 这里之所以直接抛出是因为一旦发生了业务异常就不是dubbo框架能处理的了,再重试也没有意义了
            if (e.isbiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (throwable e) {
            le = new rpcexception(e.getmessage(), e);
        } finally {
            providers.add(invoker.geturl().getaddress());
        }
    }
    throw new rpcexception(le.getcode(), "failed to invoke the method "
            + methodname + " in the service " + getinterface().getname()
            + ". tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.geturl().getaddress()
            + " on the consumer " + netutils.getlocalhost() + " using the dubbo version "
            + version.getversion() + ". last error is: "
            + le.getmessage(), le.getcause() != null ? le.getcause() : le);
}

这个方法的逻辑还是比较清晰的,就是重试,这也就是这个这个类的主要功能,故障转移,如果调用发生异常,就重试调用其他可用的提供者。其中select方法的实现在抽象类abstractclusterinvoker中。

abstractclusterinvoker.select

// 这个方法主要实现了“粘滞”调用的逻辑
protected invoker<t> select(loadbalance loadbalance, invocation invocation,
                            list<invoker<t>> invokers, list<invoker<t>> selected) throws rpcexception {

    if (collectionutils.isempty(invokers)) {
        return null;
    }
    string methodname = invocation == null ? stringutils.empty : invocation.getmethodname();

    // 可以通过在url中设置sticky参数的值来决定要不要启用“粘滞”调用的特性
    // 默认不启用该特性
    boolean sticky = invokers.get(0).geturl()
            .getmethodparameter(methodname, constants.cluster_sticky_key, constants.default_cluster_sticky);

    //ignore overloaded method
    // 如果缓存的粘滞invoker已经不在可用列表里了,那么就应当将其移除
    if (stickyinvoker != null && !invokers.contains(stickyinvoker)) {
        stickyinvoker = null;
    }
    //ignore concurrency problem
    // 如果启用了粘滞调用,并且粘滞调用存在,并且粘滞的invoker不在已经调用失败的invoker列表中
    // 那么直接返回粘滞的invoker
    if (sticky && stickyinvoker != null && (selected == null || !selected.contains(stickyinvoker))) {
        if (availablecheck && stickyinvoker.isavailable()) {
            return stickyinvoker;
        }
    }

    // 根据负载均衡策略选择一个invoker
    invoker<t> invoker = doselect(loadbalance, invocation, invokers, selected);

    // 设置粘滞的invoker
    if (sticky) {
        stickyinvoker = invoker;
    }
    return invoker;
}

这个方法主要实现了“粘滞”调用的逻辑。

abstractclusterinvoker.doselect

// 根据负载均衡策略选择一个invoker
private invoker<t> doselect(loadbalance loadbalance, invocation invocation,
                            list<invoker<t>> invokers, list<invoker<t>> selected) throws rpcexception {

    if (collectionutils.isempty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    // 根据负载均衡策略选择一个invoker
    invoker<t> invoker = loadbalance.select(invokers, geturl(), invocation);

    //if the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // 对于选择出来的invoker还要再判断其可用性
    // 对于如下情况需要再次选择invoker
    // 1. 选出的invoker在调用失败列表中
    // 2. 设置了可用检查为true并且选出的invoker不可用
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isavailable() && geturl() != null && availablecheck)) {
        try {
            // 重新选择invoker, 首先排除调用失败列表进行选择,实在不行会去调用失败列表中看能不能找到又“活过来”的提供者
            invoker<t> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexof(invoker);
                try {
                    //avoid collision
                    // 如果没有重选出新的invoker,那么直接用下一个invoker
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (exception e) {
                    logger.warn(e.getmessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getmessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

第一次选择是不考虑调用失败列表的,所以选出来的invoker有可能在调用失败列表中,这时需要进行重选。

abstractclusterinvoker.reselect

private invoker<t> reselect(loadbalance loadbalance, invocation invocation,
                            list<invoker<t>> invokers, list<invoker<t>> selected, boolean availablecheck) throws rpcexception {

    //allocating one in advance, this list is certain to be used.
    list<invoker<t>> reselectinvokers = new arraylist<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // first, try picking a invoker not in `selected`.
    for (invoker<t> invoker : invokers) {
        if (availablecheck && !invoker.isavailable()) {
            continue;
        }

        // 排除调用失败列表中的invoker
        if (selected == null || !selected.contains(invoker)) {
            reselectinvokers.add(invoker);
        }
    }

    // 如果还有剩余的invoker, 那么根据负载均衡逻策略选择一个
    if (!reselectinvokers.isempty()) {
        return loadbalance.select(reselectinvokers, geturl(), invocation);
    }

    // just pick an available invoker using loadbalance policy
    // 是在没有可用的,只能从调用失败列表中找找看有没有可用的
    // 因为在重试期间有可能之前调用失败的提供者变成可用的了
    if (selected != null) {
        for (invoker<t> invoker : selected) {
            if ((invoker.isavailable()) // available first
                    && !reselectinvokers.contains(invoker)) {
                reselectinvokers.add(invoker);
            }
        }
    }
    // 再次选择
    if (!reselectinvokers.isempty()) {
        return loadbalance.select(reselectinvokers, geturl(), invocation);
    }

    // 实在没有可用的提供者,只能返回null了
    return null;
}

其实从这几个选择的方法中可以看出来,dubbo的作者还是很用心的,尽最大可能保证调用的成功。

failfastclusterinvoker

快速失败,只调用一次,失败后直接抛异常。代码很简单,就不多说了

public result doinvoke(invocation invocation, list<invoker<t>> invokers, loadbalance loadbalance) throws rpcexception {
    checkinvokers(invokers, invocation);
    invoker<t> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (throwable e) {
        if (e instanceof rpcexception && ((rpcexception) e).isbiz()) { // biz exception.
            throw (rpcexception) e;
        }
        throw new rpcexception(e instanceof rpcexception ? ((rpcexception) e).getcode() : 0,
                "failfast invoke providers " + invoker.geturl() + " " + loadbalance.getclass().getsimplename()
                        + " select from all providers " + invokers + " for service " + getinterface().getname()
                        + " method " + invocation.getmethodname() + " on consumer " + netutils.getlocalhost()
                        + " use dubbo version " + version.getversion()
                        + ", but no luck to perform the invocation. last error is: " + e.getmessage(),
                e.getcause() != null ? e.getcause() : e);
    }
}

failsafeclusterinvoker

失败安全的故障处理策略,所谓失败安全是指在调用失败后,不抛异常只记录日志。

@override
public result doinvoke(invocation invocation, list<invoker<t>> invokers, loadbalance loadbalance) throws rpcexception {
    try {
        checkinvokers(invokers, invocation);
        invoker<t> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (throwable e) {
        logger.error("failsafe ignore exception: " + e.getmessage(), e);
        // 返回一个空结果,用户需要对返回结果进行判断
        return new rpcresult(); // ignore
    }
}

failbackclusterinvoker

失败后记录下失败的调用,之后以一定的间隔时间进行重试,这种策略很适合通知类的服务调用。重试间隔固定为5秒, 重试次数可以通过参数设置,默认是3次。

forkingclusterinvoker

这种策略比较有意思,每次调用都会起多个线程并行第跑,谁先跑出结果就用谁的,这种估计很少用吧,谁这么财大气粗,大把大把的资源用来浪费。
不过这很像一些分布式计算框架中的推测执行策略,如果有些任务跑的慢,那么就会在其他节点也跑这个任务,谁先跑完就用谁的结果,比如spark中就有推测执行的机制。

总结

不同的集群包装类有不同的故障处理策略,默认的故障转移,此外常用的有快速失败,失败安全,定时重试,合并调用等等。