欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

服务负载均衡-Dubbo篇

程序员文章站 2022-03-23 22:19:52
Dubbo负载均衡...

关键词:负载均衡    轮询算法    随机算法


Dubbo源码版本:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>dubbo</artifactId>
    <version>2.6.9</version>
</dependency>

负载均衡分为两种模式:客户端模式和服务端模式。客户端模式是服务消费者(调用方)从注册中心获取可用的服务列表,然后根据负载均衡算法选择一个服务进行调用,Dubbo中的负载均衡就是使用的客户端模式。服务端模式,是服务消费者直接调用一个服务,由该服务进行负载均衡,比如Nginx使用的是服务端模式。

一般在集群模式中,服务都是多个的。服务调用方与服务提供方一般都是多对多的关系。当一个服务调用方需要调用一个功能接口时,提供该功能接口的服务是多个,因此需要从这些服务提供者中选择一个服务进行调用。这样的过程就叫负载均衡。

负载均衡是有策略的,也就是从多个服务提供者中如何确定出一个用于本次的服务调用。这个如何确定的过程就是负载均衡策略,也叫负载均衡算法。一般有轮询、随机等。


█ 服务调用

详细的服务调用说明请戳《服务调用-Dubbo篇

通过上面的内容获取到了服务接口的代理对象,比如有这样一个接口:CatService:

public interface CatService {

    void say();

}

生成的代理对象就是这样的(伪代码):

public class CatServiceProxy implements CatService{

    @Override
    public void say() {
        // 逻辑会被转发到MockClusterInvoker的invoke方法中
        // invocation里面封装了接口、调用方法等信息
        mockClusterWrapper.invoke(invocation);

    }
}

MockClusterWrapper.invoke:

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // mock功能,服务调用失败后的容错处理
    String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
    // 使用了mock功能
    if (value.length() != 0 && !value.equalsIgnoreCase("false")) {
        if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.directory.getUrl());
            }

            result = this.doMockInvoke(invocation, (RpcException)null);
        } else {
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException var5) {
                if (var5.isBiz()) {
                    throw var5;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.directory.getUrl(), var5);
                }

                result = this.doMockInvoke(invocation, var5);
            }
        }
    } else {
        // 没有使用mock功能,直接调用this.invoker.invoke
        result = this.invoker.invoke(invocation);
    }

    return result;
}

this.invoker.invoke:

MockClusterWrapper中持有的是FailoverClusterInvoker,所以会调用FailoverClusterInvoker中的invoke方法,FailoverClusterInvoker继承了AbstractClusterInvoker,invoke是AbstractClusterInvoker中的方法:

public Result invoke(Invocation invocation) throws RpcException {
    ......
    LoadBalance loadbalance = null;
    ......
    // 获取服务列表
    List<Invoker<T>> invokers = this.list(invocation);
    // 获取LoadBalance,负载均衡器,可通过配置指定,默认为RandomLoadBalance
    if (invokers != null && !invokers.isEmpty()) {
        loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
    }

    RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
    return this.doInvoke(invocation, invokers, loadbalance);
}

this.doInvoke:

doInvoke是AbstractClusterInvoker定义的抽象方法,具体要看子类实现,进入FailoverClusterInvoker的doInvoke方法:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    ......
    // copyinvokers就是invokers,
    // invoked是已获取到的invoker列表
    // 进入负载均衡选择服务
    Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
    invoked.add(invoker);
    ......
}

select:

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    ......
    Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
    ......
}

doSelect:

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    ......
    // 调用具体的LoadBalance逻辑获取一个服务
    Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
    ......
}

█ 负载均衡

(1)指定服务调用时的负载均衡算法:

服务调用者和服务提供者都指定时,以服务调用者配置的为准。

<dubbo:service loadbalance="xxx" />
<dubbo:reference loadbalance="xxx" />

(2)LoadBalance:接口,定义了select方法

@SPI("random")
public interface LoadBalance {
    @Adaptive({"loadbalance"})
    <T> Invoker<T> select(List<Invoker<T>> var1, URL var2, Invocation var3) throws RpcException;
}

AbstractLoadBalance:抽象类,实现LoadBalance,定义了模板方法

public abstract class AbstractLoadBalance implements LoadBalance
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers != null && !invokers.isEmpty()) {
        // this.doSelect是抽象方法,由具体的子类实现
        return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
    } else {
        return null;
    }
}

AbstractLoadBalance有四个实现类,分别对应四种负载均衡策略:RoundRobinLoadBalance(轮询)、RandomLoadBalance(随机)、ConsistentHashLoadBalance(一致性哈希)、LeastActiveLoadBalance(最小活跃数)。四个实现类实现了AbstractLoadBalance中的抽象方法doSelect。

服务负载均衡-Dubbo篇

(3)在Dubbo项目的META-INF\dubbo\internal\com.alibaba.dubbo.rpc.cluster.LoadBalance,提供默认扩展实现:

random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
  • RoundRobinLoadBalance

通过轮询进行负载均衡,比如服务列表中有A、B、C、D四个服务,四个服务按照ABCD顺序排列。如果第一次选中了A,则第二次就会选中B,再下一次选中C,再下一次选中D,然后再回到A。思想是这样的。Dubbo中的轮询负载均衡,还添加了一个权重来调整轮询的策略。看看RoundRobinLoadBalance中的代码实现:

// invokers 从注册中心获取到的所有的接口服务列表
// url 调用服务请求的URL信息
// invocation 调用接口方法信息
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // 获取服务列表中的第一个服务,根据serviceKey与方法名拼接成key
    // 如果是对一个接口方法提供服务的话,此处invokers列表中的所有key都一样。
    String key = ((Invoker)invokers.get(0)).getUrl().getServiceKey() + "." + invocation.getMethodName();
    // 缓存处理,缓存服务方法key与WeightedRoundRobin
    // WeightedRoundRobin记录所有方法的权重信息,key是接口方法信息,value是一个map,map记录的是当前接口方法下,
    // 每一个服务提供者的权重信息
    ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> map = (ConcurrentMap)this.methodWeightMap.get(key);
    // 初始化
    if (map == null) {
        this.methodWeightMap.putIfAbsent(key, new ConcurrentHashMap());
        map = (ConcurrentMap)this.methodWeightMap.get(key);
    }
    // 所有服务的总的权重值
    int totalWeight = 0;
    long maxCurrent = -9223372036854775808L;
    long now = System.currentTimeMillis();
    // 最终经过算法被选中的服务提供者
    Invoker<T> selectedInvoker = null;
    // selectedInvoker对应的WeightedRoundRobin 
    RoundRobinLoadBalance.WeightedRoundRobin selectedWRR = null;

    int weight;
    // 遍历服务提供者列表
    for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
        Invoker<T> invoker = (Invoker)var13.next();
        // 每一个服务提供者唯一标识
        String identifyString = invoker.getUrl().toIdentityString();
        // 获取当前服务提供者的权重
        RoundRobinLoadBalance.WeightedRoundRobin weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
        // 获取和计算当前服务提供者的权重
        weight = this.getWeight(invoker, invocation);
        if (weight < 0) {
            weight = 0;
        }
        // 对于第一次提供服务的,这边是null,要进行初始化
        if (weightedRoundRobin == null) {
            weightedRoundRobin = new RoundRobinLoadBalance.WeightedRoundRobin();
            // 将计算得到的权重值记录下来
            weightedRoundRobin.setWeight(weight);
            // 存放到map中
            map.putIfAbsent(identifyString, weightedRoundRobin);
            weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
        }
        // 这里是如果服务不是第一次提供,就不会进入上面的if(weightedRoundRobin == null)
        // 那么weightedRoundRobin.getWeight()得到的就是历史缓存的数据,weight是计算得到的最新的数据
        // 两种会存在不相等的可能
        if (weight != weightedRoundRobin.getWeight()) {
            // 更新成最新计算得到的权重
            weightedRoundRobin.setWeight(weight);
        }
        
        // this.current.addAndGet((long)this.weight);
        // 累加当前服务提供者的权重值
        long cur = weightedRoundRobin.increaseCurrent();
        weightedRoundRobin.setLastUpdate(now);
        // 如果累加之后的权重值大于最大的权重值-9223372036854775808L,则进入逻辑
        if (cur > maxCurrent) {
            // 更新最大权重值
            maxCurrent = cur;
            selectedInvoker = invoker;
            selectedWRR = weightedRoundRobin;
        }
    }
    
    // 服务提供者列表发生了变化,要刷新以前的缓存数据
    if (!this.updateLock.get() && invokers.size() != map.size() && this.updateLock.compareAndSet(false, true)) {
        try {
            ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> newMap = new ConcurrentHashMap();
            newMap.putAll(map);
            Iterator it = newMap.entrySet().iterator();

            while(it.hasNext()) {
                Entry<String, RoundRobinLoadBalance.WeightedRoundRobin> item = (Entry)it.next();
                if (now - ((RoundRobinLoadBalance.WeightedRoundRobin)item.getValue()).getLastUpdate() > (long)RECYCLE_PERIOD) {
                    it.remove();
                }
            }

            this.methodWeightMap.put(key, newMap);
        } finally {
            this.updateLock.set(false);
        }
    }

    if (selectedInvoker != null) {
        // this.current.addAndGet((long)(-1 * total));
        // 将当前被选中的服务提供者对应的权重值减去总权重值
        selectedWRR.sel(totalWeight);
        return selectedInvoker;
    } else {
        return (Invoker)invokers.get(0);
    }
}

关键代码是这几个有关计算值的地方:

int totalWeight = 0;
long maxCurrent = -9223372036854775808L;
// totalWeight = totalWeight + weight
for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
    weight = this.getWeight(invoker, invocation);
    // cur = cur + weight
    long cur = weightedRoundRobin.increaseCurrent();
    if (cur > maxCurrent) {
            // 更新最大权重值
            maxCurrent = cur;
            selectedInvoker = invoker;
            selectedWRR = weightedRoundRobin;
            } 
}
// cur = cur - totalWeight
selectedWRR.sel(totalWeight);

weightedRoundRobin.increaseCurrent():

// 即cur = cur + weight
public long increaseCurrent() {
    return this.current.addAndGet((long)this.weight);
}

selectedWRR.sel(totalWeight);:

// 即cur = cur - totalWeight 
// 这里减去的是所有的权重和,其实减去任意一个大于0的数效果一样
public void sel(int total) {
    this.current.addAndGet((long)(-1 * total));
}

(1)权重值相同:

服务提供者列表中一共有4个服务提供者,都使用默认权重100,则总权重值是400:

服务负载均衡-Dubbo篇

第一次调用doSelect,进入for循环,map中都没有数据:

invoker=A,weight=100,cur=cur+weigth=0+100=100,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=100,selectedInvoker=A。继续:

invoker=B,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,进入下一个循环:

invoker=C,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,进入下一个循环:

invoker=D,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,循环结束。

循环结束,map中记录ABCD权重信息,totalWeight=400。

最终选中A,将A的cur = cur+(-1)*400 = 100-400 = -300。

 

第二次调用doSelect,进入for循环,map中都缓存了数据:

invoker = A,weight = 100,cur=cur+weigth=-300+100=-200,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=-200,selectedInvoker=A。进入下一个循环:

invoker = B,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=-200。因为cur>maxCurrent,所以maxCurrent=cur=200,selectedInvoker=B。进入下一个循环:

invoker = C,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=200。因为cur不大于maxCurrent,进入下一个循环:

invoker = D,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=200。因为cur不大于maxCurrent,循环结束。

最终选中B,将B的cur = cur+(-1)*400 = 200-400=-200。

 

第三次调用doSelect,进入for循环:

invoker = A,weight = 100,cur=cur+weigth=-200+100=-100,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=-100,selectedInvoker=A。进入下一个循环:

invoker = B,weight = 100,cur=cur+weigth=-200+100=-100,maxCurrent=-100。因为cur不大于maxCurrent,进入下一个循环:

invoker = C,weight = 100,cur=cur+weigth=200+100=300,maxCurrent=-100。因为cur>maxCurrent,所以maxCurrent=cur=300,selectedInvoker=C。进入下一个循环:

invoker = D,weight = 100,cur=cur+weigth=200+100=300,maxCurrent=300。因为cur不大于maxCurrent,循环结束。

最终选中C,将C的cur = cur+(-1)*400 = -100。

 

第四次调用doSelect,进入for循环:

invoker = A,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=0,selectedInvoker=A。进入下一个循环:

invoker = B,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=0。因为cur不大于maxCurrent,进入下一个循环:

invoker = C,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=0。因为cur不大于maxCurrent,进入下一个循环:

invoker = D,weight = 100,cur = 400,maxCurrent=0。因为cur>maxCurrent,所以maxCurrent=cur=0,selectedInvoker=D。循环结束。

最终选中D,将D的cur = cur+(-1)*400 = 0。

 

此时ABCD四个的cur的值又都是一样的了,和初始值都是100一样。ABCD轮询完成一圈,再进行下一圈的轮询。

 

思想:

初始筹码相同(即权重),依次遍历列表,找到的第一个筹码最多的将被选出。被选出的要减少一定数量的筹码(总权重值),没被选出的就增加相同的一定数量的筹码(各自的权重)。此时因为初始筹码都相同,该增加的筹码也相同,该减少的筹码也是相同的。所以此时每个元素都有相同的机会被轮询到。

为了让增加一定数量的筹码(各自的权重)实际上会产生增加数量,代码中增加了让权重不能为负数的逻辑,因为和一个负数相加,就做了减法了:

if (weight < 0) {
    weight = 0;
}

 

(2)权重值不同:

服务提供者列表中一共有4个服务提供者,权重值不同。总权重值是510。

服务负载均衡-Dubbo篇

思想:

初始筹码不同(即权重),依次遍历列表,找到的第一个筹码最多的将被选出。被选出的要减少一定数量的筹码(总权重值),没被选出的就增加一定数量的筹码(各自的权重)。

此时情况就不一样了,被选中的是减去总权重值,所以被减数都是相同的,但是减数是不一样的,减数越大,最后剩下的就会越多。没被选中的要增加各自的权重,这个权重越大,增加的数量就会越多。所以,权重值越大的,被选中的机会就越大了。

  • RandomLoadBalance

随机的负载均衡算法,即从列表中随机选择一个。Dubbo也添加了一个权重值来调节随机算法:

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size();
    int totalWeight = 0;
    // 服务提供者的权重值是否全部相同
    boolean sameWeight = true;

    int offset;
    int i;
    // 遍历服务提供者列表
    for(offset = 0; offset < length; ++offset) {
        // 依次计算权重值
        i = this.getWeight((Invoker)invokers.get(offset), invocation);
        totalWeight += i;
        // 从第二个开始,拿当前的服务提供者的权重值与前面一个作比较。如果出现一次不相等的,就不是相同的权重值
        if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {
            sameWeight = false;
        }
    }
    // 总权重值大于0,并且权重值不相同
    if (totalWeight > 0 && !sameWeight) {
        // 根据总权重值产生一个0到totalWeight范围内的随机数
        offset = this.random.nextInt(totalWeight);
        // 循环,length是服务提供者列表的长度
        for(i = 0; i < length; ++i) {
            // offset = offect - weight
            offset -= this.getWeight((Invoker)invokers.get(i), invocation);
            if (offset < 0) {
                return (Invoker)invokers.get(i);
            }
        }
    }
    // 总权重值小于等于0,或者权重值都一样,则在列表长度范围内随机产生一个随机数
    // 获取下标是随机数的值返回
    return (Invoker)invokers.get(this.random.nextInt(length));
}

服务负载均衡-Dubbo篇

若随机数是50,用50依次减去100,60,200,150。差小于0则选出对应下标的元素。index=0

若随机数是100,用100依次减去100,60,200,150。差小于0则选出对应下标的元素。index=1

......

思想:

增加权重值来加大某个元素被选中的概率。权重值越大,被选中的概率就越大。如上图所示,得到的权重值集合可以看成一个长方形区域,按照四个元素的权重值来划分长方形的面积,权重值大的得到的面积就大。此时,产生随机数可以看成是向该长方形区域投掷飞镖,当然面积越大的被飞镖落中的概率就大了。

 

Dubbo官方的描述:

RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2], 权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。 接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。 权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。 只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次, 服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。

 

剩下的负载均衡介绍,请戳Dubbo官网《http://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/

本文地址:https://blog.csdn.net/weixin_50518271/article/details/110825507