欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

dubbo智能集群容错与负载均衡策略解析

程序员文章站 2024-03-16 21:07:16
...

目录

 

1. 集群容错

1.1 五种容错策略

1.2 集群容错源码解析

2. 负载均衡

2.1 四种负载均衡策略

2.2 负载均衡源码解析


1. 集群容错

当集群调用(服务消费方调用服务提供方的服务)失败时,Dubbo提供了多种容错方案,缺省模式为failover,也就是失败重试。

dubbo智能集群容错与负载均衡策略解析

  • 这里的 Invoker 是 Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
  • Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

1.1 五种容错策略

Failover Cluster:失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。

Failfast Cluster:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

Failback Cluster:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

1.2 集群容错源码解析

代码来源Dubbo2.6.2

当应用需要调用服务时,会通过invoke方法发起调用(AbstractClusterInvoker)

public Result invoke(final Invocation invocation) throws RpcException {
    //检查消费者使用dubbo版本上某个接口的Rpc集群调用程序是否被销毁
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;
    //Router 负责从多个 Invoker 中按路由规则选出子集
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
        //加载默认负载均衡random
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    //如果异步需要附加调用ID
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //根据Invoker子集和负载均衡策略发起调用
    return doInvoke(invocation, invokers, loadbalance);
}

protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

AbstractClusterInvoker.invoke就是选出了Invoker子集并加载了负载均衡策略,然后去发起调用,而doInvoke的实现就是几种容错策略的实现,以FailoverClusterInvoker为例 

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    //检查invokers是否为空
    checkInvokers(copyinvokers, invocation);
    //返回重试次数
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //重试前重新选择以避免更改候选“调用者”。
        //注意:如果`invokers`改变了,那么`invoked`也会失去准确性。
        if (i > 0) {
            //再次检查
            checkWhetherDestroyed();
            copyinvokers = list(invocation);
            // check again
            checkInvokers(copyinvokers, invocation);
        }
        //根据负载均衡策略选择一个invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        //记录发起过调用的地址,防止重试时调用了已经调用过的地址
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //通过之前选出的invoker进行调用
            Result result = invoker.invoke(invocation);
            //调用成功,判断是否重试过,如果重试过,记录下警告信息,记录失败的invoker
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            //如果是业务异常则直接抛出错误,不重试
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            //发生过调用的地址记录下来
            providers.add(invoker.getUrl().getAddress());
        }
    }
    //重试次数循环完后未调用成功,则抛出异常
    throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
            + invocation.getMethodName() + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

FailoverClusterInvoker.doInvoke就是得到重试次数去循环,通过负载均衡策略在invoker子集中得到一个invoker,如果出现非业务异常就去重试

 

总结来说,智能容错步骤:

1 选出Invoker子集并加载负载均衡策略,然后去发起调用;

2 失败重试容错机制为例:获取重试次数去循环,通过负载均衡策略在invoker子集中得到一个invoker,如果出现非业务异常就去重试

2. 负载均衡

在集群负载均衡时,Dubbo 提供了多种均衡策略,智能感知下游节点健康状况,显著减少调用延迟,提高系统吞吐量,缺省为 random 随机调用。

2.1 四种负载均衡策略

Random LoadBalance: 随机,按权重设置随机概率。在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

RoundRobin LoadBalance轮询,按公约后的权重设置轮询比率。存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive LoadBalance最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

ConsistentHash LoadBalance一致性 Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

2.2 负载均衡源码解析

负载均衡策略的实现,select方法入参: 

loadbalance 负载均衡策略;invocation:当前调用的信息;invoker 调用者候选;

selected 本次调用过程中此次负载均衡之前选出的invoker调用失败的集合

/**
 * 使用负载均衡策略选择一个Invoker。</br>
 * a)首先,使用负载均衡选择一个Invoker。 如果此Invoker在先前选择的列表中,或者如果该Invoker不可用,则继续执行步骤b(重新选择),否则返回第一个选定的Invoker</br>
 * b)重新选择,重新选择的验证规则:选中>可用。 该规则确保选定的Invoker有最小机会成为先前选定列表中的一个,并且还确保该Invoker可用。
 */
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    String methodName = invocation == null ? "" : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
        //忽略重载方法
        //如果之前有调用过的provider且现已经不存在则设置为null
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        //忽略并发问题
        //如果sticky为true,且之前有调用过的provider且该provider未失败则继续使用该provider
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
    }
    //负载均衡选择
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

doSelect方法入参:

loadbalance 负载均衡策略;invocation:当前调用的信息;invoker 调用者候选;selected 排除选择的调用者与否

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    //如果invokers子集只有一个就返回这一个
    if (invokers.size() == 1)
        return invokers.get(0);
    // 如果invokers子集只有两个invoker, 并存在调用失败,则循环使用
    if (invokers.size() == 2 && selected != null && !selected.isEmpty()) {
        return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
    }
    //如果当前负载均衡为null,则使用默认random
    if (loadbalance == null) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    //使用负载均衡策略得到一个invoker
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //如果selected中包含 或者 availablecheck(决定是否从集群中排除不可用的调用者)为true 则重新选择.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            //重新选择invoker
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //重新选择invoker为null,则检查当前所选调用者的索引,如果不是最后一个,则选择index + 1。
                int index = invokers.indexOf(invoker);
                try {
                    //避免碰撞
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}


public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers == null || invokers.isEmpty())
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    return doSelect(invokers, url, invocation);
}

protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

 负载均衡提供的select方法共有三个参数,invokers:可用的服务列表;url:包含consumer的信息;invocation:当前调用的信息。

默认的负载均衡算法为RandomLoadBalance, 随机调度算法。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size();
    int totalWeight = 0;
    boolean sameWeight = true;
    for (int i = 0; i < length; i++) {
        //获取每个invoker的权重
        int weight = getWeight(invokers.get(i), invocation);
        //权重总和
        totalWeight += weight;
        //如果所有的invoker权重都一样则sameWeight为true,否则false
        if (sameWeight && i > 0
                && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false;
        }
    }
    //至少一个调用者的权重> 0 且 并非每个调用者的权重相同
    if (totalWeight > 0 && !sameWeight) {
        //根据totalWeight随机选择:[0,totalWeight)
        int offset = random.nextInt(totalWeight);
        //确定随机值对应的invoker
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    //if条件不满足,返回[0,length)随机值
    return invokers.get(random.nextInt(length));
}

计算权重的方法

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    //获取provider配置的权重(默认100)
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
        //获取provider 启动的时间(默认0)
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
        if (timestamp > 0L) {
            //获取启动时长
            int uptime = (int) (System.currentTimeMillis() - timestamp);
            //获取预热时间(默认600000,即10分钟),注意warmup不是provider的基本参数,需要通过dubbo:paramater配置
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
            if (uptime > 0 && uptime < warmup) {
                //如果启动时长小于预热时间:说明还在预热阶段,需要相应的减少权重
                weight = calculateWarmupWeight(uptime, warmup, weight);
            }
        }
    }
    return weight;
}

//权重计算方式为启动时长占预热时间的百分比乘以权重
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

总结来说,负载均衡策略步骤:

1 如果invoker子集只有一个,则直接返回;如果有两个且之前选出的invoker调用失败的集合不为空,则退化为循环

2 使用负载均衡选择一个Invoker,随机调用为例,如果至少一个调用者的权重> 0 且 并非每个调用者的权重相同,则根据totalWeight随机选择[0,totalWeight),并确定随机数对应的invoker,否则返回length随机数[0,length)

3 如果选出的Invoker在先前选择的列表中,或者如果该Invoker不可用,则重新选择,否则返回第一个选定的Invoker

4 重新选择,重新选择的验证规则:选中>可用。 该规则确保选定的Invoker有最小机会成为先前选定列表中的一个,并且还确保该Invoker可用。