Hystrix的Command执行
常用的执行方式有同步execute()和异步queue(),同步的方法也是通过异步get()来实现的queue().get();queue()方法其实只是对核心方法AbstractCommand#toObservable()做了代理处理,下面重点介绍该方法。
1. 用局部变量保存当前引用,初始化终止命令清除回调terminateCommandCleanup和退订回调unsubscribeCommandCleanup,回调会使用原子操作进行设置状态,触发一些统计数据类,生成对象的执行结果ExecutionResult,调用通知和钩子方法等等,
核心执行方法为applyHystrixSemantics,执行启动钩子函数,验证是否允许执行,HystrixCircuitBreakerImpl,判断强制开关是否开启,它的优先级最高,其次是查看链路状态是否为-1,它为原始状态,也就是正常执行状态,如果一段时间内错误执行数超过配置的比例,该值会被重设,会有一个休眠时间,到达之后会放进一个请求,并且设置状态为半开,否则就不允许执行。
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
获取信号量执行方式,Hystrix支持两种执行隔离方式,线程和信号量,尝试获取允许executionSemaphore.tryAcquire(),设置开始执行的时间executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());转到executeCommandAndObserve执行,判断properties.executionTimeoutEnabled().get()是否设置了超时时间,executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));回调方法call,设置超时监听器TimerListener,初始化HystrixTimer,设置周期执行线程executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);间接时间为超时时间executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);循环调用tick方法,当原子方法执行成功后,就代表已经超时,发布超时通知退订,结束执行等,正常执行的话会执行onNext方法,设置执行状态
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
// if the child unsubscribes we unsubscribe our parent as well
child.add(s);
//capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
TimerListener listener = new TimerListener() {
@Override
public void tick() {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}
}
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// set externally so execute/queue can see this
originalCommand.timeoutTimer.set(tl);
/**
* If this subscriber receives values it means the parent succeeded/completed
*/
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
// if s is unsubscribed we want to unsubscribe the parent
s.add(parent);
return parent;
}
2. 隔离执行,线程或者信号量方式,检验一些初始状态,设置初始执行结果,增加线程数,任务执行数,设置执行回调等。
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
执行钩子函数,设置成功回调方法,最后执行Observable.just(run());run方法就是我们创建的自定义Command所必须要实现的方法,也就是我们要实现的功能代码。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
当任务执行成功与否都会执行一些统计函数,决定熔断开关,执行结果等等,
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
3. 前面讲的都是applyHystrixSemantics方法的内容,在他们执行之前会先判断缓存的开关以及是否有缓存。
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
4 当熔断开关打开时,即applyHystrixSemantics#(circuitBreaker.attemptExecution())获取失败,执行handleShortCircuitViaFallback()方法,进而执行getFallbackOrThrowException方法,获取执行允许,判断用户是否自定义了getFallback函数,也就是熔断后的执行业务逻辑,getFallbackObservable()=》Observable.just(getFallback())。另外,当获取执行允许信号量失败时executionSemaphore.tryAcquire()执行handleSemaphoreRejectionViaFallback方法,底层也是调用了getFallbackOrThrowException方法。
if (fallbackSemaphore.tryAcquire()) {
try {
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
fallbackExecutionChain = getFallbackObservable();
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
}
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
return handleFallbackRejectionByEmittingError();
}
最后就是在executeCommandAndObserve方法中执行execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext)设置的失败回调中也设置了相应的失败函数handleThreadPoolRejectionViaFallback,handleTimeoutViaFallback,handleFailureViaFallback。
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};