上一节中讲到了HystrixCommand有四种执行方法,这一节就来说一下这四种方法直接的关系以及他们的实现。并发
execute方法使用同步方式获取结果,本质是调用了queue方法获取了一个Future,而后经过该Future获取返回结果。app
public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
queue方法经过异步方式活动结果,本质是经过toObservable方法获取了一个Observable,而后经过该Observable得到一个Future异步
public Future<R> queue() { ... final Future<R> delegate = toObservable().toBlocking().toFuture(); ... };
observe方法经过Observable实现异步,而且该Observable不管在任什么时候候监听,均可以接收到全部消息。本质是使用了一个ReplaySubject监听toObservable返回的Observable,而后再返回该ReplaySubject。ide
public Observable<R> observe() {
ReplaySubject<R> subject = ReplaySubject.create(); final Subscription sourceSubscription = toObservable().subscribe(subject); ... return subject.doOnUnsubscribe(new Action0() { @Override public void call() { sourceSubscription.unsubscribe(); } }); }
toObservable方法Observable实现异步,在Observable被监听时执行。本质是建立一个Observable,当这个Observable被监听时,执行run命令,并返回消息。ui
public Observable<R> toObservable() { ... return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) //会调用run方法 .map(wrapWithAllOnNextHooks); ... } }); }
命令执行流程:spa
命令执行流程主要包含一下内容:线程
1.熔断器判断code
2.各个阶段HystrixCommandExecutionHook调用。blog
3.各个阶段ip
4.发送metrics消息
执行流程:
1.执行HystrixCommandExecutionHook的onStart。
2.经过熔断器判断服务是否熔断
3.若是没有熔断
3.1若是是SEMAPHORE模式
3.1.1 判断是否到达指定并发量,若是不是进入
3.1.1.1 向metrics发送执行开始消息。
3.1.1.2执行HystrixCommandExecutionHook的onRunStart、onExecutionStart方法
3.1.1.3执行run方法,并把执行结果返回Observable。
3.1.1.4监听Observable执行结果消息,若是返回消息是success、error、emit执行HystrixCommandExecutionHook相应方法。
3.1.3 [handleSemaphoreRejectionViaFallback]
3.2若是是THREAD模式
3.2.1 向metrics发送执行开始消息。
3.2.2 向metrics发送线程池执行开始消息。
3.2.3 执行HystrixCommandExecutionHook的onThreadStart、onRunStart、onExecutionStart方法
3.2.4 执行run方法,并把执行结果返回Observable。
3.2.5 监听Observable执行结果消息,若是返回消息是success、error、emit执行HystrixCommandExecutionHook相应方法。
3.2.6 若是线程执行过程当中,发送TERMINAL,UNSUBSCRIBED,执行HystrixCommandExecutionHook的onThreadComplete方法,并向metrics发送线程池执行结束消息。
2.3若是timeout检测启动,则启动定时器判断timeout检测。
4.若是熔断[handleShortCircuitViaFallback]
命令执行状态
hystrix命令执行过程当中有如下状态,hystrix命令的状态是不可逆的,每一个hystrix命令只能使用一次:
1. 初始状态(NOT_STARTED)。
2.建立(OBSERVABLE_CHAIN_CREATED),toObservable开始执行后变为OBSERVABLE_CHAIN_CREATED状态
3.用户代码执行( USER_CODE_EXECUTED),用户调用开始执行前变为USER_CODE_EXECUTED
4.UNSUBSCRIBED,
5.TERMINAL
hystrix命令执行过程当中线程有一下状态:
1. 初始状态(NOT_USING_THREAD)。
2.开始(STARTED),经过线程池方式执行命令前变为STARTED。
3.UNSUBSCRIBED,
4.TERMINAL