HystrixCommand与AbstractCommand浅析
因为hystrix的命令执行都是基于RXjava的,所以在看源码之前一定要先了解RXjava。
HystrixCommand继承了AbstractCommand,HystrixCommand主要是提供了更多的构造函数和构造参数建造者以及最终调用run()实现用户业务,而AbstractCommand中就是要实现隔离、熔断等核心功能。下面我主要以注释的形式来分析,如有不对的地方欢迎指正。
1、HystrixCommand
/**
* Used for synchronous execution of command.
*/
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
/**
* Used for asynchronous execution of command.
*/
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture(); //核心代码,执行体
final Future<R> f = new Future<R>() {
......//这里面是把dalegate委托给f
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
......
}
return f;
}
toObservable().toBlocking().toFuture(),一眼看去以为这里只是在创建一个Future,其实这里是在以defer方式创建Observable,然后在toFuture中去订阅这个Observable,从而触发执行,并返回一个Future来获取结果。
final Future<R> delegate = toObservable().toBlocking().toFuture(); //核心代码,执行体
getExecutionObservable()是最终被调用执行run()的方法。
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run()); //调用run(),执行用户业务
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
2、AbstractCommand
先看看AbstractCommand的成员变量。
protected final HystrixCircuitBreaker circuitBreaker; //熔断器
protected final HystrixThreadPool threadPool; //线程隔离使用的线程池
protected final HystrixThreadPoolKey threadPoolKey; //线程池key
protected final HystrixCommandProperties properties; //可配置的属性
protected enum TimedOutStatus {
NOT_EXECUTED, COMPLETED, TIMED_OUT
}
protected enum CommandState {
NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL
}
protected enum ThreadState {
NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL
}
protected final HystrixCommandMetrics metrics; //运行中的指标,统计数据,监控需要的流
protected final HystrixCommandKey commandKey;
protected final HystrixCommandGroupKey commandGroup;
/**
* Plugin implementations
*/
protected final HystrixEventNotifier eventNotifier;
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected final HystrixCommandExecutionHook executionHook;
/* FALLBACK Semaphore 降级处理的并发量也是有控制的 */
protected final TryableSemaphore fallbackSemaphoreOverride;
/* each circuit has a semaphore to restrict concurrent fallback execution */
protected static final ConcurrentHashMap<String, TryableSemaphore> fallbackSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();
/* END FALLBACK Semaphore */
/* EXECUTION Semaphore 信号量隔离*/
protected final TryableSemaphore executionSemaphoreOverride;
protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();
/* END EXECUTION Semaphore */
protected final AtomicReference<Reference<TimerListener>> timeoutTimer = new AtomicReference<Reference<TimerListener>>();
protected AtomicReference<CommandState> commandState = new AtomicReference<CommandState>(CommandState.NOT_STARTED);
protected AtomicReference<ThreadState> threadState = new AtomicReference<ThreadState>(ThreadState.NOT_USING_THREAD);
/*
* Examples: RESPONSE_FROM_CACHE, CANCELLED HystrixEventTypes
*/
protected volatile ExecutionResult executionResult = ExecutionResult.EMPTY; //state on shared execution
protected volatile boolean isResponseFromCache = false;
protected volatile ExecutionResult executionResultAtTimeOfCancellation;
protected volatile long commandStartTimestamp = -1L;
/* If this command executed and timed-out */
protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<TimedOutStatus>(TimedOutStatus.NOT_EXECUTED);
protected volatile Action0 endCurrentThreadExecutingCommand;
/**
* Instance of RequestCache logic
*/
protected final HystrixRequestCache requestCache;
protected final HystrixRequestLog currentRequestLog;
private static ConcurrentHashMap<Class<?>, String> defaultNameCache = new ConcurrentHashMap<Class<?>, String>();
protected static ConcurrentHashMap<HystrixCommandKey, Boolean> commandContainsFallback = new ConcurrentHashMap<HystrixCommandKey, Boolean>();
HystrixCommand调用AbstractCommand的toObservable()方法中就可以看到hystrix执行流程中的缓存检查,以及未使用缓存时后续的执行调用。
/**
* Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
*/
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
......
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;
try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}
try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};
......
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first 首先尝试从缓存中获取结果 */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics) //未使用缓存,则继续调用applyHystrixSemantics
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
下一流程节点,applyHystrixSemantics(final AbstractCommand<R> _cmd),熔断器检查以及尝试信号量隔离检查。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd); //执行前的操作
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) { //熔断器检查
final TryableSemaphore executionSemaphore = getExecutionSemaphore(); //尝试信号量隔离,若是信号量则executionSemaphore是TryableSemaphoreActual类的实例
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) { //若是线程隔离,则executionSemaphore是TryableSemaphoreNoOp类的实例
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd) //继续向下执行命令
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
final TryableSemaphore executionSemaphore = getExecutionSemaphore();//尝试信号量隔离,若是信号量则executionSemaphore是TryableSemaphoreActual类的实例,若是线程隔离,则executionSemaphore是TryableSemaphoreNoOp类的实例
装饰执行实际执行run()的ObServable。
/**
* This decorates "Hystrix" functionality around the run() Observable.
*/
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
......
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
......
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
......
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution; //最终执行用户定义的run()方法的Observable
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd),根据设置的隔离模式执行。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //会调用实现类的该方法获取执行run()的Observable
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { //为执行run()的Observable指定调度器,该调度器里面包括了使用的线程池信息,subscribeOn就会根据当前Observable和调度器创建任务,并执行
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //会调用实现类的该方法获取执行run()的Observable
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
在HystrixCommand中覆盖的getExecutionObservable()方法获得执行run()的Observable。
subscribeOn(threadPool.getScheduler(new Func0<Boolean>(){})),threadPool.getScheduler(new Func0<Boolean>(){})为执行run()的Observable指定调度器,该调度器里面包括了使用的线程池信息,subscribeOn()就会根据当前Observable和获取到的调度器创建任务,任务就提交给线程池执行。
上一篇: Cocoapods私有库搭建
下一篇: Go语言简单源代码剖析