Hystrix工作流程解析
搭建hystrix源码阅读环境
引入依赖
<dependency> <groupid>com.netflix.hystrix</groupid> <artifactid>hystrix-core</artifactid> <version>1.5.12</version> </dependency>
创建command
类
public class hellocommand extends hystrixcommand<string> { public hellocommand() { super(hystrixcommandgroupkey.factory.askey("test")); } @override protected string run() throws exception { thread.sleep(800); return "sucess"; } @override protected string getfallback() { system.out.println("执行了回退方法"); return "error"; } }
创建测试类
public class commandtest { public static void main(string[] args) { hellocommand command = new hellocommand(); string result = command.execute(); system.out.println(result); } }
hystrix工作流程
首先我们看一下上方的这张图,这个图完整的描述了hystrix的工作流程:
1.每次调用都会创建一个hystrixcommand
2.执行execute或queue做同步\异步调用
3.判断熔断器是否打开,如果打开跳到步骤8,否则进入步骤4
4.判断线程池/信号量是否跑满,如果跑满进入步骤8,否则进入步骤5
5.调用hystrixcommand的run方法,如果调用超时进入步骤8
6.判断是否调用成功,返回成功调用结果,如果失败进入步骤8
7.计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态
8.降级处理逻辑,根据上方的步骤可以得出以下四种情况会进入降级处理:
- 熔断器打开
- 线程池/信号量跑满
- 调用超时
- 调用失败
9.返回执行成功结果
创建hystrixcommand
接着我们结合源码看一下这个调用流程,直接执行测试类的main方法,可以看到入口就在execute
方法上
public r execute() { try { return queue().get(); } catch (exception e) { throw exceptions.sneakythrow(decomposeexception(e)); } }
执行同步方法
public future<r> queue() { final future<r> delegate = toobservable().toblocking().tofuture(); //省略。。。 };
接着看toobservable()
方法
public observable<r> toobservable() { //省略。。。 return observable.defer(new func0<observable<r>>() { @override public observable<r> call() { if (!commandstate.compareandset(commandstate.not_started, commandstate.observable_chain_created)) { illegalstateexception ex = new illegalstateexception("this instance can only be executed once. please instantiate a new instance."); //todo make a new error type for this throw new hystrixruntimeexception(failuretype.bad_request_exception, _cmd.getclass(), getlogmessageprefix() + " command executed multiple times - this is not permitted.", ex, null); } commandstarttimestamp = system.currenttimemillis(); if (properties.requestlogenabled().get()) { // log this command execution regardless of what happened if (currentrequestlog != null) { currentrequestlog.addexecutedcommand(_cmd); } } final boolean requestcacheenabled = isrequestcachingenabled(); final string cachekey = getcachekey(); //如果开启请求缓存则查询缓存是否存在 if (requestcacheenabled) { hystrixcommandresponsefromcache<r> fromcache = (hystrixcommandresponsefromcache<r>) requestcache.get(cachekey); if (fromcache != null) { isresponsefromcache = true; return handlerequestcachehitandemitvalues(fromcache, _cmd); } } observable<r> hystrixobservable = observable.defer(applyhystrixsemantics) .map(wrapwithallonnexthooks); observable<r> aftercache; if (requestcacheenabled && cachekey != null) { hystrixcachedobservable<r> tocache = hystrixcachedobservable.from(hystrixobservable, _cmd); hystrixcommandresponsefromcache<r> fromcache = (hystrixcommandresponsefromcache<r>) requestcache.putifabsent(cachekey, tocache); if (fromcache != null) { // another thread beat us so we'll use the cached value instead tocache.unsubscribe(); isresponsefromcache = true; return handlerequestcachehitandemitvalues(fromcache, _cmd); } else { // we just created an observablecommand so we cast and return it aftercache = tocache.toobservable(); } } else { aftercache = hystrixobservable; } return aftercache .doonterminate(terminatecommandcleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doonunsubscribe(unsubscribecommandcleanup) // perform cleanup once .dooncompleted(fireoncompletedhook); } }); }
在上面这个方法中会有一个缓存的判断,如果存在缓存的话直接返回结果,否则进入方法applyhystrixsemantics
方法
判断熔断器和线程池以及信号量
private observable<r> applyhystrixsemantics(final abstractcommand<r> _cmd) { executionhook.onstart(_cmd); /* determine if we're allowed to execute */ //判断是否开启熔断器 if (circuitbreaker.attemptexecution()) { //获取信号量实例 final tryablesemaphore executionsemaphore = getexecutionsemaphore(); final atomicboolean semaphorehasbeenreleased = new atomicboolean(false); final action0 singlesemaphorerelease = new action0() { @override public void call() { if (semaphorehasbeenreleased.compareandset(false, true)) { executionsemaphore.release(); } } }; final action1<throwable> markexceptionthrown = new action1<throwable>() { @override public void call(throwable t) { eventnotifier.markevent(hystrixeventtype.exception_thrown, commandkey); } }; //尝试获取信号量 if (executionsemaphore.tryacquire()) { try { /* used to track userthreadexecutiontime */ executionresult = executionresult.setinvocationstarttime(system.currenttimemillis()); return executecommandandobserve(_cmd) .doonerror(markexceptionthrown) .doonterminate(singlesemaphorerelease) .doonunsubscribe(singlesemaphorerelease); } catch (runtimeexception e) { return observable.error(e); } } else { //拒绝 return handlesemaphorerejectionviafallback(); } } else { //失败 return handleshortcircuitviafallback(); } }
在applyhystrixsemantics
方法中,首先会判断是否开启熔断器,如果开启则直接进入失败处理的逻辑。否则会尝试获取信号量(如果使用的是线程池的模式则默认获取成功),获取成功进入executecommandandobserve
方法
判断超时
private observable<r> executecommandandobserve(final abstractcommand<r> _cmd) { final hystrixrequestcontext currentrequestcontext = hystrixrequestcontext.getcontextforcurrentthread(); //省略... //判断是否开启超时设置 if (properties.executiontimeoutenabled().get()) { //list添加超时操作 execution = executecommandwithspecifiedisolation(_cmd) .lift(new hystrixobservabletimeoutoperator<r>(_cmd)); } else { execution = executecommandwithspecifiedisolation(_cmd); }
这里如果设置超时时间的话则会加上一个超时的处理,接着看executecommandwithspecifiedisolation
方法
private observable<r> executecommandwithspecifiedisolation(final abstractcommand<r> _cmd) { if (properties.executionisolationstrategy().get() == executionisolationstrategy.thread) { return observable.defer(new func0<observable<r>>() { @override public observable<r> call() { executionresult = executionresult.setexecutionoccurred(); if (!commandstate.compareandset(commandstate.observable_chain_created, commandstate.user_code_executed)) { return observable.error(new illegalstateexception("execution attempted while in state : " + commandstate.get().name())); } metrics.markcommandstart(commandkey, threadpoolkey, executionisolationstrategy.thread); if (iscommandtimedout.get() == timedoutstatus.timed_out) { return observable.error(new runtimeexception("timed out before executing run()")); } if (threadstate.compareandset(threadstate.not_using_thread, threadstate.started)) { hystrixcounters.incrementglobalconcurrentthreads(); threadpool.markthreadexecution(); // store the command that is being run endcurrentthreadexecutingcommand = hystrix.startcurrentthreadexecutingcommand(getcommandkey()); executionresult = executionresult.setexecutedinthread(); try { executionhook.onthreadstart(_cmd); executionhook.onrunstart(_cmd); executionhook.onexecutionstart(_cmd); return getuserexecutionobservable(_cmd); } catch (throwable ex) { return observable.error(ex); } } else { return observable.empty(); } } }).doonterminate(new action0() { @override public void call() { if (threadstate.compareandset(threadstate.started, threadstate.terminal)) { handlethreadend(_cmd); } if (threadstate.compareandset(threadstate.not_using_thread, threadstate.terminal)) { } } }).doonunsubscribe(new action0() { @override public void call() { if (threadstate.compareandset(threadstate.started, threadstate.unsubscribed)) { handlethreadend(_cmd); } if (threadstate.compareandset(threadstate.not_using_thread, threadstate.unsubscribed)) { } } }).subscribeon(threadpool.getscheduler(new func0<boolean>() { @override public boolean call() { return properties.executionisolationthreadinterruptontimeout().get() && _cmd.iscommandtimedout.get() == timedoutstatus.timed_out; } })); } else { return observable.defer(new func0<observable<r>>() { @override public observable<r> call() { executionresult = executionresult.setexecutionoccurred(); if (!commandstate.compareandset(commandstate.observable_chain_created, commandstate.user_code_executed)) { return observable.error(new illegalstateexception("execution attempted while in state : " + commandstate.get().name())); } metrics.markcommandstart(commandkey, threadpoolkey, executionisolationstrategy.semaphore); // semaphore isolated // store the command that is being run endcurrentthreadexecutingcommand = hystrix.startcurrentthreadexecutingcommand(getcommandkey()); try { executionhook.onrunstart(_cmd); executionhook.onexecutionstart(_cmd); return getuserexecutionobservable(_cmd); //the getuserexecutionobservable method already wraps sync exceptions, so this shouldn't throw } catch (throwable ex) { //if the above hooks throw, then use that as the result of the run method return observable.error(ex); } } }); } }
这段代码比较长,具体的执行逻辑为:
- 进入方法会首先判断隔离策略,如果是使用的信号量模式则在当前线程上执行,否则进入下方的线程池逻辑
- 更改hystrixcommand的状态为user_code_executed
- 判断hystrixcommand的超时状态,如果超时则抛出异常
- 更改当前command的线程执行状态为started
- 调用getuserexecutionobservable执行具体的业务逻辑,也就是我们实现的那个run方法
- doonterminate:执行完毕后更改线程状态为terminal
- doonunsubscribe:当observable被取消订阅,更改线程状态为terminal
- subscribeon:指定scheduler
上一篇: SpringCloud之Spring Cloud Stream:消息驱动
下一篇: 读取字节流。