dubbo智能集群容错与负载均衡策略解析
目录
1. 集群容错
当集群调用(服务消费方调用服务提供方的服务)失败时,Dubbo提供了多种容错方案,缺省模式为failover,也就是失败重试。
- 这里的
Invoker
是Provider
的一个可调用Service
的抽象,Invoker
封装了Provider
地址及Service
接口信息 -
Directory
代表多个Invoker
,可以把它看成List<Invoker>
,但与List
不同的是,它的值可能是动态变化的,比如注册中心推送变更 -
Cluster
将Directory
中的多个Invoker
伪装成一个Invoker
,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个 -
Router
负责从多个Invoker
中按路由规则选出子集,比如读写分离,应用隔离等 -
LoadBalance
负责从多个Invoker
中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选
1.1 五种容错策略
Failover Cluster:失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2"
来设置重试次数(不含第一次)。
Failfast Cluster:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
Failsafe Cluster:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
Failback Cluster:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
Forking Cluster:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2"
来设置最大并行数。
1.2 集群容错源码解析
代码来源Dubbo2.6.2
当应用需要调用服务时,会通过invoke方法发起调用(AbstractClusterInvoker)
public Result invoke(final Invocation invocation) throws RpcException {
//检查消费者使用dubbo版本上某个接口的Rpc集群调用程序是否被销毁
checkWhetherDestroyed();
LoadBalance loadbalance = null;
//Router 负责从多个 Invoker 中按路由规则选出子集
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
//加载默认负载均衡random
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
//如果异步需要附加调用ID
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//根据Invoker子集和负载均衡策略发起调用
return doInvoke(invocation, invokers, loadbalance);
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
AbstractClusterInvoker.invoke就是选出了Invoker子集并加载了负载均衡策略,然后去发起调用,而doInvoke的实现就是几种容错策略的实现,以FailoverClusterInvoker为例
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
//检查invokers是否为空
checkInvokers(copyinvokers, invocation);
//返回重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试前重新选择以避免更改候选“调用者”。
//注意:如果`invokers`改变了,那么`invoked`也会失去准确性。
if (i > 0) {
//再次检查
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//根据负载均衡策略选择一个invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
//记录发起过调用的地址,防止重试时调用了已经调用过的地址
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//通过之前选出的invoker进行调用
Result result = invoker.invoke(invocation);
//调用成功,判断是否重试过,如果重试过,记录下警告信息,记录失败的invoker
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
//如果是业务异常则直接抛出错误,不重试
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
//发生过调用的地址记录下来
providers.add(invoker.getUrl().getAddress());
}
}
//重试次数循环完后未调用成功,则抛出异常
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
FailoverClusterInvoker.doInvoke就是得到重试次数去循环,通过负载均衡策略在invoker子集中得到一个invoker,如果出现非业务异常就去重试
总结来说,智能容错步骤:
1 选出Invoker子集并加载负载均衡策略,然后去发起调用;
2 失败重试容错机制为例:获取重试次数去循环,通过负载均衡策略在invoker子集中得到一个invoker,如果出现非业务异常就去重试
2. 负载均衡
在集群负载均衡时,Dubbo 提供了多种均衡策略,智能感知下游节点健康状况,显著减少调用延迟,提高系统吞吐量,缺省为 random
随机调用。
2.1 四种负载均衡策略
Random LoadBalance: 随机,按权重设置随机概率。在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
RoundRobin LoadBalance:轮询,按公约后的权重设置轮询比率。存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
ConsistentHash LoadBalance:一致性 Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
2.2 负载均衡源码解析
负载均衡策略的实现,select方法入参:
loadbalance 负载均衡策略;invocation:当前调用的信息;invoker 调用者候选;
selected 本次调用过程中此次负载均衡之前选出的invoker调用失败的集合
/**
* 使用负载均衡策略选择一个Invoker。</br>
* a)首先,使用负载均衡选择一个Invoker。 如果此Invoker在先前选择的列表中,或者如果该Invoker不可用,则继续执行步骤b(重新选择),否则返回第一个选定的Invoker</br>
* b)重新选择,重新选择的验证规则:选中>可用。 该规则确保选定的Invoker有最小机会成为先前选定列表中的一个,并且还确保该Invoker可用。
*/
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
//忽略重载方法
//如果之前有调用过的provider且现已经不存在则设置为null
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//忽略并发问题
//如果sticky为true,且之前有调用过的provider且该provider未失败则继续使用该provider
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
//负载均衡选择
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
doSelect方法入参:
loadbalance 负载均衡策略;invocation:当前调用的信息;invoker 调用者候选;selected 排除选择的调用者与否
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
//如果invokers子集只有一个就返回这一个
if (invokers.size() == 1)
return invokers.get(0);
// 如果invokers子集只有两个invoker, 并存在调用失败,则循环使用
if (invokers.size() == 2 && selected != null && !selected.isEmpty()) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
//如果当前负载均衡为null,则使用默认random
if (loadbalance == null) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
//使用负载均衡策略得到一个invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果selected中包含 或者 availablecheck(决定是否从集群中排除不可用的调用者)为true 则重新选择.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
//重新选择invoker
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
//重新选择invoker为null,则检查当前所选调用者的索引,如果不是最后一个,则选择index + 1。
int index = invokers.indexOf(invoker);
try {
//避免碰撞
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
负载均衡提供的select方法共有三个参数,invokers:可用的服务列表;url:包含consumer的信息;invocation:当前调用的信息。
默认的负载均衡算法为RandomLoadBalance, 随机调度算法。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int totalWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
//获取每个invoker的权重
int weight = getWeight(invokers.get(i), invocation);
//权重总和
totalWeight += weight;
//如果所有的invoker权重都一样则sameWeight为true,否则false
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
//至少一个调用者的权重> 0 且 并非每个调用者的权重相同
if (totalWeight > 0 && !sameWeight) {
//根据totalWeight随机选择:[0,totalWeight)
int offset = random.nextInt(totalWeight);
//确定随机值对应的invoker
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
//if条件不满足,返回[0,length)随机值
return invokers.get(random.nextInt(length));
}
计算权重的方法
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
//获取provider配置的权重(默认100)
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
//获取provider 启动的时间(默认0)
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
//获取启动时长
int uptime = (int) (System.currentTimeMillis() - timestamp);
//获取预热时间(默认600000,即10分钟),注意warmup不是provider的基本参数,需要通过dubbo:paramater配置
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
//如果启动时长小于预热时间:说明还在预热阶段,需要相应的减少权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
//权重计算方式为启动时长占预热时间的百分比乘以权重
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
总结来说,负载均衡策略步骤:
1 如果invoker子集只有一个,则直接返回;如果有两个且之前选出的invoker调用失败的集合不为空,则退化为循环
2 使用负载均衡选择一个Invoker,随机调用为例,如果至少一个调用者的权重> 0 且 并非每个调用者的权重相同,则根据totalWeight随机选择[0,totalWeight),并确定随机数对应的invoker,否则返回length随机数[0,length)
3 如果选出的Invoker在先前选择的列表中,或者如果该Invoker不可用,则重新选择,否则返回第一个选定的Invoker
4 重新选择,重新选择的验证规则:选中>可用。 该规则确保选定的Invoker有最小机会成为先前选定列表中的一个,并且还确保该Invoker可用。