欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hystrix工作流程解析

程序员文章站 2023-10-28 22:54:58
搭建Hystrix源码阅读环境 引入依赖 创建 类 创建测试类 Hystrix工作流程 首先我们看一下上方的这张图,这个图完整的描述了Hystrix的工作流程: 1.每次调用都会创建一个HystrixCommand 2.执行execute或queue做同步\异步调用 3.判断熔断器是否打开,如果打开 ......
搭建hystrix源码阅读环境

引入依赖

        <dependency>
            <groupid>com.netflix.hystrix</groupid>
            <artifactid>hystrix-core</artifactid>
            <version>1.5.12</version>
        </dependency>

创建command

public class hellocommand extends hystrixcommand<string> {

    public hellocommand() {
        super(hystrixcommandgroupkey.factory.askey("test"));
    }

    @override
    protected string run() throws exception {
        thread.sleep(800);
        return "sucess";
    }

    @override
    protected string getfallback() {
        system.out.println("执行了回退方法");
        return "error";
    }

}

创建测试类

public class commandtest {
    public static void main(string[] args) {
        hellocommand command = new hellocommand();
        string result = command.execute();
        system.out.println(result);
    }
}
hystrix工作流程

Hystrix工作流程解析

首先我们看一下上方的这张图,这个图完整的描述了hystrix的工作流程:
1.每次调用都会创建一个hystrixcommand
2.执行execute或queue做同步\异步调用
3.判断熔断器是否打开,如果打开跳到步骤8,否则进入步骤4
4.判断线程池/信号量是否跑满,如果跑满进入步骤8,否则进入步骤5
5.调用hystrixcommand的run方法,如果调用超时进入步骤8
6.判断是否调用成功,返回成功调用结果,如果失败进入步骤8
7.计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态
8.降级处理逻辑,根据上方的步骤可以得出以下四种情况会进入降级处理:

  1. 熔断器打开
  2. 线程池/信号量跑满
  3. 调用超时
  4. 调用失败

9.返回执行成功结果

创建hystrixcommand

接着我们结合源码看一下这个调用流程,直接执行测试类的main方法,可以看到入口就在execute方法上

    public r execute() {
        try {
            return queue().get();
        } catch (exception e) {
            throw exceptions.sneakythrow(decomposeexception(e));
        }
    }
执行同步方法
public future<r> queue() {
        final future<r> delegate = toobservable().toblocking().tofuture();
        //省略。。。
};

接着看toobservable()方法

 public observable<r> toobservable() {
        //省略。。。
        return observable.defer(new func0<observable<r>>() {
            @override
            public observable<r> call() {
                if (!commandstate.compareandset(commandstate.not_started, commandstate.observable_chain_created)) {
                    illegalstateexception ex = new illegalstateexception("this instance can only be executed once. please instantiate a new instance.");
                    //todo make a new error type for this
                    throw new hystrixruntimeexception(failuretype.bad_request_exception, _cmd.getclass(), getlogmessageprefix() + " command executed multiple times - this is not permitted.", ex, null);
                }

                commandstarttimestamp = system.currenttimemillis();

                if (properties.requestlogenabled().get()) {
                    // log this command execution regardless of what happened
                    if (currentrequestlog != null) {
                        currentrequestlog.addexecutedcommand(_cmd);
                    }
                }

                final boolean requestcacheenabled = isrequestcachingenabled();
                final string cachekey = getcachekey();
                //如果开启请求缓存则查询缓存是否存在
                if (requestcacheenabled) {
                    hystrixcommandresponsefromcache<r> fromcache = (hystrixcommandresponsefromcache<r>) requestcache.get(cachekey);
                    if (fromcache != null) {
                        isresponsefromcache = true;
                        return handlerequestcachehitandemitvalues(fromcache, _cmd);
                    }
                }

                observable<r> hystrixobservable =
                        observable.defer(applyhystrixsemantics)
                                .map(wrapwithallonnexthooks);

                observable<r> aftercache;

                if (requestcacheenabled && cachekey != null) { 
                    hystrixcachedobservable<r> tocache = hystrixcachedobservable.from(hystrixobservable, _cmd);
                    hystrixcommandresponsefromcache<r> fromcache = (hystrixcommandresponsefromcache<r>) requestcache.putifabsent(cachekey, tocache);
                    if (fromcache != null) {
                        // another thread beat us so we'll use the cached value instead
                        tocache.unsubscribe();
                        isresponsefromcache = true;
                        return handlerequestcachehitandemitvalues(fromcache, _cmd);
                    } else {
                        // we just created an observablecommand so we cast and return it
                        aftercache = tocache.toobservable();
                    }
                } else {
                    aftercache = hystrixobservable;
                }

                return aftercache
                        .doonterminate(terminatecommandcleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doonunsubscribe(unsubscribecommandcleanup) // perform cleanup once
                        .dooncompleted(fireoncompletedhook);
            }
        });
    }

在上面这个方法中会有一个缓存的判断,如果存在缓存的话直接返回结果,否则进入方法applyhystrixsemantics方法

判断熔断器和线程池以及信号量
private observable<r> applyhystrixsemantics(final abstractcommand<r> _cmd) {
        executionhook.onstart(_cmd);

        /* determine if we're allowed to execute */
        //判断是否开启熔断器
        if (circuitbreaker.attemptexecution()) {
            //获取信号量实例
            final tryablesemaphore executionsemaphore = getexecutionsemaphore();
            final atomicboolean semaphorehasbeenreleased = new atomicboolean(false);
            final action0 singlesemaphorerelease = new action0() {
                @override
                public void call() {
                    if (semaphorehasbeenreleased.compareandset(false, true)) {
                        executionsemaphore.release();
                    }
                }
            };

            final action1<throwable> markexceptionthrown = new action1<throwable>() {
                @override
                public void call(throwable t) {
                    eventnotifier.markevent(hystrixeventtype.exception_thrown, commandkey);
                }
            };
            //尝试获取信号量
            if (executionsemaphore.tryacquire()) {
                try {
                    /* used to track userthreadexecutiontime */
                    executionresult = executionresult.setinvocationstarttime(system.currenttimemillis());
                    return executecommandandobserve(_cmd)
                            .doonerror(markexceptionthrown)
                            .doonterminate(singlesemaphorerelease)
                            .doonunsubscribe(singlesemaphorerelease);
                } catch (runtimeexception e) {
                    return observable.error(e);
                }
            } else {
                //拒绝
                return handlesemaphorerejectionviafallback();
            }
        } else {
            //失败
            return handleshortcircuitviafallback();
        }
    }

applyhystrixsemantics方法中,首先会判断是否开启熔断器,如果开启则直接进入失败处理的逻辑。否则会尝试获取信号量(如果使用的是线程池的模式则默认获取成功),获取成功进入executecommandandobserve方法

判断超时
private observable<r> executecommandandobserve(final abstractcommand<r> _cmd) {
        final hystrixrequestcontext currentrequestcontext = hystrixrequestcontext.getcontextforcurrentthread();
        //省略...

        //判断是否开启超时设置
        if (properties.executiontimeoutenabled().get()) {
           //list添加超时操作
            execution = executecommandwithspecifiedisolation(_cmd)
                    .lift(new hystrixobservabletimeoutoperator<r>(_cmd));
        } else {
            execution = executecommandwithspecifiedisolation(_cmd);
        }

这里如果设置超时时间的话则会加上一个超时的处理,接着看executecommandwithspecifiedisolation方法

private observable<r> executecommandwithspecifiedisolation(final abstractcommand<r> _cmd) {
        if (properties.executionisolationstrategy().get() == executionisolationstrategy.thread) {
            return observable.defer(new func0<observable<r>>() {
                @override
                public observable<r> call() {
                    executionresult = executionresult.setexecutionoccurred();
                    if (!commandstate.compareandset(commandstate.observable_chain_created, commandstate.user_code_executed)) {
                        return observable.error(new illegalstateexception("execution attempted while in state : " + commandstate.get().name()));
                    }
                    metrics.markcommandstart(commandkey, threadpoolkey, executionisolationstrategy.thread);

                    if (iscommandtimedout.get() == timedoutstatus.timed_out) {
                        return observable.error(new runtimeexception("timed out before executing run()"));
                    }
                    if (threadstate.compareandset(threadstate.not_using_thread, threadstate.started)) {
                        hystrixcounters.incrementglobalconcurrentthreads();
                        threadpool.markthreadexecution();
                        // store the command that is being run
                        endcurrentthreadexecutingcommand = hystrix.startcurrentthreadexecutingcommand(getcommandkey());
                        executionresult = executionresult.setexecutedinthread();

                        try {
                            executionhook.onthreadstart(_cmd);
                            executionhook.onrunstart(_cmd);
                            executionhook.onexecutionstart(_cmd);
                            return getuserexecutionobservable(_cmd);
                        } catch (throwable ex) {
                            return observable.error(ex);
                        }
                    } else {
                        return observable.empty();
                    }
                }
            }).doonterminate(new action0() {
                @override
                public void call() {
                    if (threadstate.compareandset(threadstate.started, threadstate.terminal)) {
                        handlethreadend(_cmd);
                    }
                    if (threadstate.compareandset(threadstate.not_using_thread, threadstate.terminal)) {
                    }
                }
            }).doonunsubscribe(new action0() {
                @override
                public void call() {
                    if (threadstate.compareandset(threadstate.started, threadstate.unsubscribed)) {
                        handlethreadend(_cmd);
                    }
                    if (threadstate.compareandset(threadstate.not_using_thread, threadstate.unsubscribed)) {
                    }
                }
            }).subscribeon(threadpool.getscheduler(new func0<boolean>() {
                @override
                public boolean call() {
                    return properties.executionisolationthreadinterruptontimeout().get() && _cmd.iscommandtimedout.get() == timedoutstatus.timed_out;
                }
            }));
        } else {
            return observable.defer(new func0<observable<r>>() {
                @override
                public observable<r> call() {
                    executionresult = executionresult.setexecutionoccurred();
                    if (!commandstate.compareandset(commandstate.observable_chain_created, commandstate.user_code_executed)) {
                        return observable.error(new illegalstateexception("execution attempted while in state : " + commandstate.get().name()));
                    }

                    metrics.markcommandstart(commandkey, threadpoolkey, executionisolationstrategy.semaphore);
                    // semaphore isolated
                    // store the command that is being run
                    endcurrentthreadexecutingcommand = hystrix.startcurrentthreadexecutingcommand(getcommandkey());
                    try {
                        executionhook.onrunstart(_cmd);
                        executionhook.onexecutionstart(_cmd);
                        return getuserexecutionobservable(_cmd);  //the getuserexecutionobservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (throwable ex) {
                        //if the above hooks throw, then use that as the result of the run method
                        return observable.error(ex);
                    }
                }
            });
        }
    }

这段代码比较长,具体的执行逻辑为:

  1. 进入方法会首先判断隔离策略,如果是使用的信号量模式则在当前线程上执行,否则进入下方的线程池逻辑
  2. 更改hystrixcommand的状态为user_code_executed
  3. 判断hystrixcommand的超时状态,如果超时则抛出异常
  4. 更改当前command的线程执行状态为started
  5. 调用getuserexecutionobservable执行具体的业务逻辑,也就是我们实现的那个run方法
  6. doonterminate:执行完毕后更改线程状态为terminal
  7. doonunsubscribe:当observable被取消订阅,更改线程状态为terminal
  8. subscribeon:指定scheduler