Sentinel的限流、熔断源码浅析
程序员文章站
2022-03-10 08:38:30
一、简单使用(源码分析分析基石)1.引入 Sentinel 依赖 com.alibaba.csp sentinel-core 1.8.0 2.对Controller方法打上注解@SentinelResou...
一、简单使用(源码分析分析基石)
1.引入 Sentinel 依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
2.对Controller方法打上注解
@SentinelResource(value = "/order/create", blockHandler = "blockHandlerFunc", fallback = "fallbackFunc")
@GetMapping("/test-sentinel-resource")
public String testSentinelResource(@RequestParam(required = false) String a)
throws InterruptedException {
// 模拟执行被保护的业务逻辑耗时
Thread.sleep(100);
return a;
}
二、注解解析-----切面
通过一个切面类SentinelResourceAspect,去解析我们写的@SentinelResource
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
//切点,SentinelResource注解
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
//环绕通知
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex, annotation);
return handleFallback(pjp, annotation, ex);
}
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
三、处理方式----try/catch
通过一个大大的try/catch,包裹住我们的方法,在之前执行SphU.entry(通过在这规则校验抛出异常而阻止后续方法正常执行达到限流熔断),执行之后根据方法执行的情况进行记录处理等:
try {
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex, annotation);
return handleFallback(pjp, annotation, ex);
}
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
四、重点看看SphU.entry的规则校验记录怎么实现的?
跟进上面的那行代码:
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
不断跟进会来到entryWithPriority方法:
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//。。。略
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
主要是lookProcessChain获取一个“链条”,然后执行“链条”的entry的方法(实际上是lookProcessChain取出很多个ProcessorSlot,然后依次执行,责任链设计模式)
4.1 lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
单例设计模式DCL去创建ProcessorSlotChain,主要是:
chain = SlotChainProvider.newSlotChain();
查看源码会发现是通过建造者模式builder去创建的,而建造者本身builde方法:
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//加入下面这些slot形成链表,之后调用的时候依次执行
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
真相大白,这个执行“执行链条”就是这些各种‘Slot’,核心的是:StatisticSlot(统计相关)、FlowSlot(限流相关)、DegradeSlot(降级相关)。
4.2 chain.entry依次执行各个Slot
回到之前的entryWithPriority方法:
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//。。。略
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
那么拿到执行链条之后,就是依次执行各个Slot的entry方法了!
五、StatisticSlot(统计相关)
稍后补充
六、FlowSlot(限流相关)
稍后补充
七、DegradeSlot(降级相关)
稍后补充
本文地址:https://blog.csdn.net/Q3838418/article/details/112194774