服务负载均衡-Dubbo篇
关键词:负载均衡 轮询算法 随机算法
Dubbo源码版本:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.6.9</version>
</dependency>
负载均衡分为两种模式:客户端模式和服务端模式。客户端模式是服务消费者(调用方)从注册中心获取可用的服务列表,然后根据负载均衡算法选择一个服务进行调用,Dubbo中的负载均衡就是使用的客户端模式。服务端模式,是服务消费者直接调用一个服务,由该服务进行负载均衡,比如Nginx使用的是服务端模式。
一般在集群模式中,服务都是多个的。服务调用方与服务提供方一般都是多对多的关系。当一个服务调用方需要调用一个功能接口时,提供该功能接口的服务是多个,因此需要从这些服务提供者中选择一个服务进行调用。这样的过程就叫负载均衡。
负载均衡是有策略的,也就是从多个服务提供者中如何确定出一个用于本次的服务调用。这个如何确定的过程就是负载均衡策略,也叫负载均衡算法。一般有轮询、随机等。
█ 服务调用
详细的服务调用说明请戳《服务调用-Dubbo篇》
通过上面的内容获取到了服务接口的代理对象,比如有这样一个接口:CatService:
public interface CatService {
void say();
}
生成的代理对象就是这样的(伪代码):
public class CatServiceProxy implements CatService{
@Override
public void say() {
// 逻辑会被转发到MockClusterInvoker的invoke方法中
// invocation里面封装了接口、调用方法等信息
mockClusterWrapper.invoke(invocation);
}
}
MockClusterWrapper.invoke:
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// mock功能,服务调用失败后的容错处理
String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
// 使用了mock功能
if (value.length() != 0 && !value.equalsIgnoreCase("false")) {
if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.directory.getUrl());
}
result = this.doMockInvoke(invocation, (RpcException)null);
} else {
try {
result = this.invoker.invoke(invocation);
} catch (RpcException var5) {
if (var5.isBiz()) {
throw var5;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.directory.getUrl(), var5);
}
result = this.doMockInvoke(invocation, var5);
}
}
} else {
// 没有使用mock功能,直接调用this.invoker.invoke
result = this.invoker.invoke(invocation);
}
return result;
}
this.invoker.invoke:
MockClusterWrapper中持有的是FailoverClusterInvoker,所以会调用FailoverClusterInvoker中的invoke方法,FailoverClusterInvoker继承了AbstractClusterInvoker,invoke是AbstractClusterInvoker中的方法:
public Result invoke(Invocation invocation) throws RpcException {
......
LoadBalance loadbalance = null;
......
// 获取服务列表
List<Invoker<T>> invokers = this.list(invocation);
// 获取LoadBalance,负载均衡器,可通过配置指定,默认为RandomLoadBalance
if (invokers != null && !invokers.isEmpty()) {
loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
}
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
return this.doInvoke(invocation, invokers, loadbalance);
}
this.doInvoke:
doInvoke是AbstractClusterInvoker定义的抽象方法,具体要看子类实现,进入FailoverClusterInvoker的doInvoke方法:
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
......
// copyinvokers就是invokers,
// invoked是已获取到的invoker列表
// 进入负载均衡选择服务
Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
......
}
select:
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
......
Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
......
}
doSelect:
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
......
// 调用具体的LoadBalance逻辑获取一个服务
Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
......
}
█ 负载均衡
(1)指定服务调用时的负载均衡算法:
服务调用者和服务提供者都指定时,以服务调用者配置的为准。
<dubbo:service loadbalance="xxx" />
<dubbo:reference loadbalance="xxx" />
(2)LoadBalance:接口,定义了select方法
@SPI("random")
public interface LoadBalance {
@Adaptive({"loadbalance"})
<T> Invoker<T> select(List<Invoker<T>> var1, URL var2, Invocation var3) throws RpcException;
}
AbstractLoadBalance:抽象类,实现LoadBalance,定义了模板方法
public abstract class AbstractLoadBalance implements LoadBalance
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers != null && !invokers.isEmpty()) {
// this.doSelect是抽象方法,由具体的子类实现
return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
} else {
return null;
}
}
AbstractLoadBalance有四个实现类,分别对应四种负载均衡策略:RoundRobinLoadBalance(轮询)、RandomLoadBalance(随机)、ConsistentHashLoadBalance(一致性哈希)、LeastActiveLoadBalance(最小活跃数)。四个实现类实现了AbstractLoadBalance中的抽象方法doSelect。
(3)在Dubbo项目的META-INF\dubbo\internal\com.alibaba.dubbo.rpc.cluster.LoadBalance,提供默认扩展实现:
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
- RoundRobinLoadBalance:
通过轮询进行负载均衡,比如服务列表中有A、B、C、D四个服务,四个服务按照ABCD顺序排列。如果第一次选中了A,则第二次就会选中B,再下一次选中C,再下一次选中D,然后再回到A。思想是这样的。Dubbo中的轮询负载均衡,还添加了一个权重来调整轮询的策略。看看RoundRobinLoadBalance中的代码实现:
// invokers 从注册中心获取到的所有的接口服务列表
// url 调用服务请求的URL信息
// invocation 调用接口方法信息
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 获取服务列表中的第一个服务,根据serviceKey与方法名拼接成key
// 如果是对一个接口方法提供服务的话,此处invokers列表中的所有key都一样。
String key = ((Invoker)invokers.get(0)).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 缓存处理,缓存服务方法key与WeightedRoundRobin
// WeightedRoundRobin记录所有方法的权重信息,key是接口方法信息,value是一个map,map记录的是当前接口方法下,
// 每一个服务提供者的权重信息
ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> map = (ConcurrentMap)this.methodWeightMap.get(key);
// 初始化
if (map == null) {
this.methodWeightMap.putIfAbsent(key, new ConcurrentHashMap());
map = (ConcurrentMap)this.methodWeightMap.get(key);
}
// 所有服务的总的权重值
int totalWeight = 0;
long maxCurrent = -9223372036854775808L;
long now = System.currentTimeMillis();
// 最终经过算法被选中的服务提供者
Invoker<T> selectedInvoker = null;
// selectedInvoker对应的WeightedRoundRobin
RoundRobinLoadBalance.WeightedRoundRobin selectedWRR = null;
int weight;
// 遍历服务提供者列表
for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
Invoker<T> invoker = (Invoker)var13.next();
// 每一个服务提供者唯一标识
String identifyString = invoker.getUrl().toIdentityString();
// 获取当前服务提供者的权重
RoundRobinLoadBalance.WeightedRoundRobin weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
// 获取和计算当前服务提供者的权重
weight = this.getWeight(invoker, invocation);
if (weight < 0) {
weight = 0;
}
// 对于第一次提供服务的,这边是null,要进行初始化
if (weightedRoundRobin == null) {
weightedRoundRobin = new RoundRobinLoadBalance.WeightedRoundRobin();
// 将计算得到的权重值记录下来
weightedRoundRobin.setWeight(weight);
// 存放到map中
map.putIfAbsent(identifyString, weightedRoundRobin);
weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
}
// 这里是如果服务不是第一次提供,就不会进入上面的if(weightedRoundRobin == null)
// 那么weightedRoundRobin.getWeight()得到的就是历史缓存的数据,weight是计算得到的最新的数据
// 两种会存在不相等的可能
if (weight != weightedRoundRobin.getWeight()) {
// 更新成最新计算得到的权重
weightedRoundRobin.setWeight(weight);
}
// this.current.addAndGet((long)this.weight);
// 累加当前服务提供者的权重值
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// 如果累加之后的权重值大于最大的权重值-9223372036854775808L,则进入逻辑
if (cur > maxCurrent) {
// 更新最大权重值
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
}
// 服务提供者列表发生了变化,要刷新以前的缓存数据
if (!this.updateLock.get() && invokers.size() != map.size() && this.updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> newMap = new ConcurrentHashMap();
newMap.putAll(map);
Iterator it = newMap.entrySet().iterator();
while(it.hasNext()) {
Entry<String, RoundRobinLoadBalance.WeightedRoundRobin> item = (Entry)it.next();
if (now - ((RoundRobinLoadBalance.WeightedRoundRobin)item.getValue()).getLastUpdate() > (long)RECYCLE_PERIOD) {
it.remove();
}
}
this.methodWeightMap.put(key, newMap);
} finally {
this.updateLock.set(false);
}
}
if (selectedInvoker != null) {
// this.current.addAndGet((long)(-1 * total));
// 将当前被选中的服务提供者对应的权重值减去总权重值
selectedWRR.sel(totalWeight);
return selectedInvoker;
} else {
return (Invoker)invokers.get(0);
}
}
关键代码是这几个有关计算值的地方:
int totalWeight = 0;
long maxCurrent = -9223372036854775808L;
// totalWeight = totalWeight + weight
for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
weight = this.getWeight(invoker, invocation);
// cur = cur + weight
long cur = weightedRoundRobin.increaseCurrent();
if (cur > maxCurrent) {
// 更新最大权重值
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
}
// cur = cur - totalWeight
selectedWRR.sel(totalWeight);
weightedRoundRobin.increaseCurrent():
// 即cur = cur + weight
public long increaseCurrent() {
return this.current.addAndGet((long)this.weight);
}
selectedWRR.sel(totalWeight);:
// 即cur = cur - totalWeight
// 这里减去的是所有的权重和,其实减去任意一个大于0的数效果一样
public void sel(int total) {
this.current.addAndGet((long)(-1 * total));
}
(1)权重值相同:
服务提供者列表中一共有4个服务提供者,都使用默认权重100,则总权重值是400:
第一次调用doSelect,进入for循环,map中都没有数据:
invoker=A,weight=100,cur=cur+weigth=0+100=100,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=100,selectedInvoker=A。继续:
invoker=B,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,进入下一个循环:
invoker=C,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,进入下一个循环:
invoker=D,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,循环结束。
循环结束,map中记录ABCD权重信息,totalWeight=400。
最终选中A,将A的cur = cur+(-1)*400 = 100-400 = -300。
第二次调用doSelect,进入for循环,map中都缓存了数据:
invoker = A,weight = 100,cur=cur+weigth=-300+100=-200,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=-200,selectedInvoker=A。进入下一个循环:
invoker = B,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=-200。因为cur>maxCurrent,所以maxCurrent=cur=200,selectedInvoker=B。进入下一个循环:
invoker = C,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=200。因为cur不大于maxCurrent,进入下一个循环:
invoker = D,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=200。因为cur不大于maxCurrent,循环结束。
最终选中B,将B的cur = cur+(-1)*400 = 200-400=-200。
第三次调用doSelect,进入for循环:
invoker = A,weight = 100,cur=cur+weigth=-200+100=-100,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=-100,selectedInvoker=A。进入下一个循环:
invoker = B,weight = 100,cur=cur+weigth=-200+100=-100,maxCurrent=-100。因为cur不大于maxCurrent,进入下一个循环:
invoker = C,weight = 100,cur=cur+weigth=200+100=300,maxCurrent=-100。因为cur>maxCurrent,所以maxCurrent=cur=300,selectedInvoker=C。进入下一个循环:
invoker = D,weight = 100,cur=cur+weigth=200+100=300,maxCurrent=300。因为cur不大于maxCurrent,循环结束。
最终选中C,将C的cur = cur+(-1)*400 = -100。
第四次调用doSelect,进入for循环:
invoker = A,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=0,selectedInvoker=A。进入下一个循环:
invoker = B,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=0。因为cur不大于maxCurrent,进入下一个循环:
invoker = C,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=0。因为cur不大于maxCurrent,进入下一个循环:
invoker = D,weight = 100,cur = 400,maxCurrent=0。因为cur>maxCurrent,所以maxCurrent=cur=0,selectedInvoker=D。循环结束。
最终选中D,将D的cur = cur+(-1)*400 = 0。
此时ABCD四个的cur的值又都是一样的了,和初始值都是100一样。ABCD轮询完成一圈,再进行下一圈的轮询。
思想:
初始筹码相同(即权重),依次遍历列表,找到的第一个筹码最多的将被选出。被选出的要减少一定数量的筹码(总权重值),没被选出的就增加相同的一定数量的筹码(各自的权重)。此时因为初始筹码都相同,该增加的筹码也相同,该减少的筹码也是相同的。所以此时每个元素都有相同的机会被轮询到。
为了让增加一定数量的筹码(各自的权重)实际上会产生增加数量,代码中增加了让权重不能为负数的逻辑,因为和一个负数相加,就做了减法了:
if (weight < 0) {
weight = 0;
}
(2)权重值不同:
服务提供者列表中一共有4个服务提供者,权重值不同。总权重值是510。
思想:
初始筹码不同(即权重),依次遍历列表,找到的第一个筹码最多的将被选出。被选出的要减少一定数量的筹码(总权重值),没被选出的就增加一定数量的筹码(各自的权重)。
此时情况就不一样了,被选中的是减去总权重值,所以被减数都是相同的,但是减数是不一样的,减数越大,最后剩下的就会越多。没被选中的要增加各自的权重,这个权重越大,增加的数量就会越多。所以,权重值越大的,被选中的机会就越大了。
- RandomLoadBalance:
随机的负载均衡算法,即从列表中随机选择一个。Dubbo也添加了一个权重值来调节随机算法:
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int totalWeight = 0;
// 服务提供者的权重值是否全部相同
boolean sameWeight = true;
int offset;
int i;
// 遍历服务提供者列表
for(offset = 0; offset < length; ++offset) {
// 依次计算权重值
i = this.getWeight((Invoker)invokers.get(offset), invocation);
totalWeight += i;
// 从第二个开始,拿当前的服务提供者的权重值与前面一个作比较。如果出现一次不相等的,就不是相同的权重值
if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {
sameWeight = false;
}
}
// 总权重值大于0,并且权重值不相同
if (totalWeight > 0 && !sameWeight) {
// 根据总权重值产生一个0到totalWeight范围内的随机数
offset = this.random.nextInt(totalWeight);
// 循环,length是服务提供者列表的长度
for(i = 0; i < length; ++i) {
// offset = offect - weight
offset -= this.getWeight((Invoker)invokers.get(i), invocation);
if (offset < 0) {
return (Invoker)invokers.get(i);
}
}
}
// 总权重值小于等于0,或者权重值都一样,则在列表长度范围内随机产生一个随机数
// 获取下标是随机数的值返回
return (Invoker)invokers.get(this.random.nextInt(length));
}
若随机数是50,用50依次减去100,60,200,150。差小于0则选出对应下标的元素。index=0
若随机数是100,用100依次减去100,60,200,150。差小于0则选出对应下标的元素。index=1
......
思想:
增加权重值来加大某个元素被选中的概率。权重值越大,被选中的概率就越大。如上图所示,得到的权重值集合可以看成一个长方形区域,按照四个元素的权重值来划分长方形的面积,权重值大的得到的面积就大。此时,产生随机数可以看成是向该长方形区域投掷飞镖,当然面积越大的被飞镖落中的概率就大了。
Dubbo官方的描述:
RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2], 权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。 接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。 权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。 只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次, 服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。
剩下的负载均衡介绍,请戳Dubbo官网《http://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/》
本文地址:https://blog.csdn.net/weixin_50518271/article/details/110825507