电路熔断器(Circuit Breaker)自我思考
链接:https://zhuanlan.zhihu.com/p/23711137
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
在我们的工程实践中,偶尔会遇到一些服务由于网络连接超时,系统有异常或load过高出现暂时不可用等情况,导致对这些服务的调用失败,可能需要一段时间才能修复,这种对请求的阻塞可能会占用宝贵的系统资源,如:内存,线程,数据库连接等等,最坏的情况下会导致这些资源被消耗殆尽,使得系统里不相关的部分所使用的资源也耗尽从而拖累整个系统。在这种情况下,调用操作能够立即返回错误而不是等待超时的发生或者重试可能是一种更好的选择,只有当被调用的服务有可能成功时我们再去尝试。
熔断器模式可以防止我们的系统不断地尝试执行可能会失败的调用,使得我们的系统继续执行而不用等待修正错误,或者浪费CPU时间去等到长时间的超时产生。熔断器模式也可以使我们系统能够检测错误是否已经修正,如果已经修正,系统会再次尝试调用操作。下图是个使用熔断器模式的调用流程:
可以从图中看出,当超时出现的次数达到一定条件后,熔断器会触发打开状态,客户端的下次调用将直接返回,不用等待超时产生。
在熔断器内部,往往有以下几种状态:
1)闭合(closed)状态:该状态下能够对目标服务或方法进行正常的调用。熔断器类维护了一个时间窗口内调用失败的次数,如果某次调用失败,则失败次数加1。如果最近失败次数超过了在给定的时间窗口内允许失败的阈值(可以是数量也可以是比例),则熔断器类切换到断开(Open)状态。此时熔断器设置了一个计时器,当时钟超过了该时间,则切换到半断开(Half-Open)状态,该睡眠时间的设定是给了系统一次机会来修正导致调用失败的错误。
2)断开(Open)状态:在该状态下,对目标服务或方法的请求会立即返回错误响应,如果设置了fallback方法,则会进入fallback的流程。
3)半断开(Half-Open)状态:允许对目标服务或方法的一定数量的请求可以去调用服务。如果这些请求对服务的调用成功,那么可以认为之前导致调用失败的错误已经修正,此时熔断器切换到闭合状态(并且将错误计数器重置);如果这一定数量的请求有调用失败的情况,则认为导致之前调用失败的问题仍然存在,熔断器切回到断开方式,然后开始重置计时器来给系统一定的时间来修正错误。半断开状态能够有效防止正在恢复中的服务被突然而来的大量请求再次拖垮。
- 方案一:基于spring AOP的切面设计
@Component @Aspect public class CircuitBreakerAdvice implements BeanFactoryAware { private static final Logger LOG = LoggerFactory.getLogger(CircuitBreakerAdvice.class); @Autowired private BeanFactory beanFactory; public CircuitBreakerAdvice() { LOG.debug("CircuitBreaker created"); } @Around("@annotation(ch.tom.cb.CircuitBreaker)") public Object intercept(ProceedingJoinPoint point) throws Throwable { CircuitBreakerConfig circuitBreakerConfig = getCircuitBreaker(point); try { return callTargetMethod(point, circuitBreakerConfig); } catch (Throwable e) { if (!circuitBreakerConfig.isExceptionExcluded(e)) { LOG.debug("closing circuit due to received exception"); circuitBreakerConfig.getCircuitBreakerController().setState(CircuitBreakerState.CLOSED); } throw e; } } private Object callTargetMethod(ProceedingJoinPoint point, CircuitBreakerConfig circuitBreakerConfig) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); CircuitBreakerController controller = circuitBreakerConfig.getCircuitBreakerController(); if (isMyCallTheOneThatReopensTheCircuit(circuitBreakerConfig) || circuitBreakerConfig.getCircuitBreakerController().isStateOpen()) { LOG.debug("interception method {} and state is open proceeding.", method.getName()); if (circuitBreakerConfig.getServiceTimeout() == null){ Object returnValue = point.proceed(); if (circuitBreakerConfig.getCircuitBreakerController().isStateHalfOpen()){ circuitBreakerConfig.getCircuitBreakerController().setState(CircuitBreakerState.OPEN); } return returnValue; }else { FutureTask circuitBreakerTimeoutFuture = new FutureTask(new CircuitBreakerTimeoutFuture(point)); Object returnValue = null; try { controller.getTimeoutExecutor().execute(circuitBreakerTimeoutFuture); returnValue = circuitBreakerTimeoutFuture.get(circuitBreakerConfig.getServiceTimeout().toMillis(), TimeUnit.MILLISECONDS); if (circuitBreakerConfig.getCircuitBreakerController().isStateHalfOpen()){ circuitBreakerConfig.getCircuitBreakerController().setState(CircuitBreakerState.OPEN); } return returnValue; }catch (TimeoutException toe){ circuitBreakerTimeoutFuture.cancel(true); LOG.debug("timeout occured on method {} closing circuit.", method.getName()); circuitBreakerConfig.getCircuitBreakerController().setState(CircuitBreakerState.CLOSED); return returnFromFallBackService(point,circuitBreakerConfig); } } } else { return returnFromFallBackService(point, circuitBreakerConfig); } } private boolean isMyCallTheOneThatReopensTheCircuit(CircuitBreakerConfig config){ if (config.getCircuitBreakerController().isForced()){ return false; } synchronized (config){ if (config.getCircuitBreakerController().isStateClosed() && config.getCircuitBreakerController().isMinClosedTimeReached(config.getMinClosedDuration())){ //ok try this call. config.getCircuitBreakerController().setState(CircuitBreakerState.HALFOPEN); LOG.debug("try to reopen circuit with one call"); return true; } } return false; } private Object returnFromFallBackService(ProceedingJoinPoint point, CircuitBreakerConfig config) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Object closedProvider = config.getClosedServiceProvider(); if (closedProvider == null){ LOG.debug("interception method {} and state is open proceeding.", method.getName()); throw new CircuitBreakerException(); }else { LOG.debug("interception method {} and state is closed call fallback.", method.getName()); Method closedProviderMethod = closedProvider.getClass().getMethod(method.getName(), method.getParameterTypes()); return closedProviderMethod.invoke(closedProvider, point.getArgs()); } } private CircuitBreakerConfig getCircuitBreaker(ProceedingJoinPoint point) throws NoSuchMethodException { CircuitBreaker circuitBreaker = null; MethodSignature signature = (MethodSignature) point.getSignature(); //method can be an interface. Method method = signature.getMethod(); Method concreteMethod = point.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes()); circuitBreaker = concreteMethod.getAnnotation(CircuitBreaker.class); CircuitBreakerConfig config = (CircuitBreakerConfig) beanFactory.getBean(circuitBreaker.config()); if (config == null) { throw new IllegalStateException(String.format("CircuitBreakerConfig with name %s could not be found", circuitBreaker.config())); } return config; } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } }
上一篇: sql创建视图全文搜索[完整版]