首发于 http://otzh.ml
Hystrix已经不在维护了,可是成功的开源项目老是值得学习的.刚开始看 Hystrix 源码时,会发现一堆 Action,Function 的逻辑,这其实就是 RxJava 的特色了--响应式编程.上篇文章已经对RxJava做过入门介绍,不熟悉的同窗能够先去看看.本文会简单介绍 Hystrix,再根据demo结合源码来了解Hystrix的执行流程.html
Hystrix 是一个延迟和容错库,旨在隔离对远程系统、服务和第三方库的访问点,中止级联故障,并在错误不可避免的复杂分布式系统中可以弹性恢复。java
核心概念react
Command 是Hystrix的入口,对用户来讲,咱们只须要建立对应的 command,将须要保护的接口包装起来就能够.能够无需关注再以后的逻辑.与 Spring 深度集成后还能够经过注解的方式,就更加对开发友好了.git
断路器,是从电气领域引伸过来的概念,具备过载、短路和欠电压保护功能,有保护线路和电源的能力.在Hystrix中即为当请求超过必定比例响应失败时,hystrix 会对请求进行拦截处理,保证服务的稳定性,以及防止出现服务之间级联雪崩的可能性.github
隔离策略是 Hystrix 的设计亮点所在,利用舱壁模式的思想来对访问的资源进行隔离,每一个资源是独立的依赖,单个资源的异常不该该影响到其余. Hystrix 的隔离策略目前有两种:线程池隔离,信号量隔离.编程
Hystrix的运行流程缓存
官方的 How it Works 对流程有很详细的介绍,图示清晰,相信看完流程图就能对运行流程有必定的了解.
HystrixCommand
是标准的命令模式实现,每一次请求即为一次命令的建立执行经历的过程.从上述Hystrix流程图能够看出建立流程最终会指向toObservable
,在以前RxJava入门时有介绍到Observable
即为被观察者,做用是发送数据给观察者进行相应的,所以能够知道这个方法应该是较为关键的.app
HystrixCommand
设计的接口,主要提供执行命令的抽象方法,例如:execute()
,queue()
,observe()
Observable
设计的接口,主要提供自动订阅(observe()
)和生成Observable(toObservable()
)的抽象方法run()
)经过新建一个 command 来看 Hystrix 是如何建立并执行的.HystrixCommand 是一个抽象类,其中有一个run
方法须要咱们实现本身的业务逻辑,如下是偷懒采用匿名内部类的形式呈现.构造方法的内部实现咱们就不关注了,直接看下执行的逻辑吧.less
HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("demo-group")) { @Override protected String run() { return "Hello World~"; } }; demo.execute();
这是官方给出的一次完整调用的链路.上述的 demo 中咱们直接调用了execute
方法,因此调用的路径为execute() -> queue() -> toObservable() -> toBlocking() -> toFuture() -> get()
.核心的逻辑其实就在toObservable()
中.分布式
execute
方法为同步调用返回结果,并对异常做处理.内部会调用queue
// 同步调用执行 public R execute() { try { // queue()返回的是Future类型的对象,因此这里是阻塞get return queue().get(); } catch (Exception e) { throw decomposeException(e); } }
queue
的第一行代码完成了核心的订阅逻辑.
toObservable()
生成了 Hystrix 的 Observable 对象Observable
转换为 BlockingObservable
能够阻塞控制数据发送toFuture
实现对 BlockingObservable
的订阅public Future<R> queue() { // 着重关注的是这行代码 // 完成了Observable的建立及订阅 // toBlocking()是将Observable转为BlockingObservable,转换后的Observable能够阻塞数据的发送 final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { // 因为toObservable().toBlocking().toFuture()返回的Future若是中断了, // 不会对当前线程进行中断,因此这里将返回的Future进行了再次包装,处理异常逻辑 ... } // 判断是否已经结束了,有异常则直接抛出 if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { // 省略这段判断 } } return f; }
// 被包装的Observable private final Observable<? extends T> o; // toBlocking()会调用该静态方法将 源Observable简单包装成BlockingObservable public static <T> BlockingObservable<T> from(final Observable<? extends T> o) { return new BlockingObservable<T>(o); } public Future<T> toFuture() { return BlockingOperatorToFuture.toFuture((Observable<T>)o); }
ReactiveX 关于toFuture的解读The
toFuture
operator applies to theBlockingObservable
subclass, so in order to use it, you must first convert your source Observable into aBlockingObservable
by means of either theBlockingObservable.from
method or theObservable.toBlocking
operator.
toFuture
只能做用于BlockingObservable
因此也才会有上文想要转换为BlockingObservable的操做
// 该操做将 源Observable转换为返回单个数据项的Future public static <T> Future<T> toFuture(Observable<? extends T> that) { // CountDownLatch 判断是否完成 final CountDownLatch finished = new CountDownLatch(1); // 存储执行结果 final AtomicReference<T> value = new AtomicReference<T>(); // 存储错误结果 final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); // single()方法能够限制Observable只发送单条数据 // 若是有多条数据 会抛 IllegalArgumentException // 若是没有数据能够发送 会抛 NoSuchElementException @SuppressWarnings("unchecked") final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() { // single()返回的Observable就能够对其进行标准的处理了 @Override public void onCompleted() { finished.countDown(); } @Override public void onError(Throwable e) { error.compareAndSet(null, e); finished.countDown(); } @Override public void onNext(T v) { // "single" guarantees there is only one "onNext" value.set(v); } }); // 最后将Subscription返回的数据封装成Future,实现对应的逻辑 return new Future<T>() { // 能够查看源码 }; }
AbstractCommand
是toObservable
实现的地方,属于Hystrix的核心逻辑,代码较长,能够和方法调用的流程图一块儿食用.toObservable
主要是完成缓存和建立Observable,requestLog的逻辑,当第一次建立Observable时,applyHystrixSemantics
方法是Hystrix的语义实现,能够跳着看.
tips: 下文中有不少 Action和 Function,他们很类似,都有call方法,可是区别在于Function有返回值,而Action没有,方法后跟着的数字表明有几个入参.Func0/Func3即没有入参和有三个入参
toObservable
代码较长且分层仍是清晰的,因此下面一块一块写.其逻辑和文章开始提到的Hystrix流程图是彻底一致的.
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // 此处省略掉了不少个Action和Function,大部分是来作扫尾清理的函数,因此用到的时候再说 // defer在上篇rxjava入门中提到过,是一种建立型的操做符,每次订阅时会产生新的Observable,回调方法中所实现的才是真正咱们须要的Observable return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 校验命令的状态,保证其只执行一次 if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); // properties为当前command的全部属性 // 容许记录请求log时会保存当前执行的command if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } // 是否开启了请求缓存 final boolean requestCacheEnabled = isRequestCachingEnabled(); // 获取缓存key final String cacheKey = getCacheKey(); // 开启缓存后,尝试从缓存中取 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 没有开启请求缓存时,就执行正常的逻辑 Observable<R> hystrixObservable = // 这里又经过defer建立了咱们须要的Observable Observable.defer(applyHystrixSemantics) // 发送前会先走一遍hook,默认executionHook是空实现的,因此这里就跳过了 .map(wrapWithAllOnNextHooks); // 获得最后的封装好的Observable后,将其放入缓存 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache // 终止时的操做 .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) // 取消订阅时的操做 .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once // 完成时的操做 .doOnCompleted(fireOnCompletedHook); } }
缓存击中时的处理
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) { try { // Hystrix中有大量的hook 若是有心作二次开发的,能够利用这些hook作到很完善的监控 executionHook.onCacheHit(this); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx); } // 将缓存的结果赋给当前command return fromCache.toObservableWithStateCopiedInto(this) // doOnTerminate 或者是后面看到的doOnUnsubscribe,doOnError,都指的是在响应onTerminate/onUnsubscribe/onError后的操做,即在Observable的生命周期上注册一个动做优雅的处理逻辑 .doOnTerminate(new Action0() { @Override public void call() { // 命令最终状态的不一样进行不一样处理 if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { cleanUpAfterResponseFromCache(false); //user code never ran } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { cleanUpAfterResponseFromCache(true); //user code did run } } }) .doOnUnsubscribe(new Action0() { @Override public void call() { // 命令最终状态的不一样进行不一样处理 if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { cleanUpAfterResponseFromCache(false); //user code never ran } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { cleanUpAfterResponseFromCache(true); //user code did run } } }); }
由于本片文章的主要目的是在讲执行流程,因此失败回退和断路器相关的就留到之后的文章中再写.
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { // 再也不订阅了就返回不发送数据的Observable if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { // 不发送任何数据或通知 return Observable.never(); } return applyHystrixSemantics(_cmd); } }; private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 标记开始执行的hook // 若是hook内抛异常了,会快速失败且没有fallback处理 executionHook.onStart(_cmd); /* determine if we're allowed to execute */ // 断路器核心逻辑: 判断是否容许执行(TODO) if (circuitBreaker.allowRequest()) { // Hystrix本身造的信号量轮子,之因此不用juc下,官方解释为juc的Semphore实现太复杂,并且没有动态调节的信号量大小的能力,简而言之,不知足需求! // 根据不一样隔离策略(线程池隔离/信号量隔离)获取不一样的TryableSemphore final TryableSemaphore executionSemaphore = getExecutionSemaphore(); // Semaphore释放标志 final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 释放信号量的Action final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; // 异常处理 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { // HystrixEventNotifier是hystrix的插件,不一样的事件发送不一样的通知,默认是空实现. eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 线程池隔离的TryableSemphore始终为true if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ // executionResult是一次命令执行的结果信息封装 // 这里设置起始时间是为了记录命令的生命周期,执行过程当中会set其余属性进去 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) // 报错时的处理 .doOnError(markExceptionThrown) // 终止时释放 .doOnTerminate(singleSemaphoreRelease) // 取消订阅时释放 .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // tryAcquire失败后会作fallback处理,TODO return handleSemaphoreRejectionViaFallback(); } } else { // 断路器短路(拒绝请求)fallback处理 TODO return handleShortCircuitViaFallback(); } }
/** * 执行run方法的地方 */ private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // 获取当前上下文 final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 发送数据时的Action响应 final Action1<R> markEmits = new Action1<R>() { @Override public void call(R r) { // 若是onNext时须要上报时,作如下处理 if (shouldOutputOnNextEvents()) { // result标记 executionResult = executionResult.addEvent(HystrixEventType.EMIT); // 通知 eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } // commandIsScalar是一个我不解的地方,在网上也没有查到好的解释 // 该方法为抽象方法,有HystrixCommand实现返回true.HystrixObservableCommand返回false if (commandIsScalar()) { // 耗时 long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // 通知 eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); // 断路器标记成功(断路器半开时的反馈,决定是否关闭断路器) circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { // 同markEmits 相似处理 } } }; // 失败回退的逻辑 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { // 不是重点略过了 } }; // 请求上下文的处理 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; // 若是有执行超时限制,会将包装后的Observable再转变为支持TimeOut的 if (properties.executionTimeoutEnabled().get()) { // 根据不一样的隔离策略包装为不一样的Observable execution = executeCommandWithSpecifiedIsolation(_cmd) // lift 是rxjava中一种基本操做符 能够将Observable转换成另外一种Observable // 包装为带有超时限制的Observable .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
根据不一样的隔离策略建立不一样的执行Observable
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 因为源码太长,这里只关注正常的流程,须要详细了解能够去看看源码 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { try { return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } }}) .doOnTerminate(new Action0() {}) .doOnUnsubscribe(new Action0() {}) // 指定在某一个线程上执行,是rxjava中很重要的线程调度的概念 .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { })); } else { // 信号量隔离策略 return Observable.defer(new Func0<Observable<R>>() { // 逻辑与线程池大体相同 }); } }
获取用户执行的逻辑
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { // getExecutionObservable是抽象方法,有HystrixCommand自行实现 userObservable = getExecutionObservable(); } catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); } // 将Observable做其余中转 return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); }
lift操做符
lift能够转换成一个新的Observable,它很像一个代理,将原来的Observable代理到本身这里,订阅时通知原来的Observable发送数据,经本身这里流转加工处理再返回给订阅者.Map/FlatMap
操做符底层其实就是用的lift
进行实现的.
@Override final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { // just操做符就是直接执行的Observable // run方法就是咱们实现的业务逻辑: Hello World~ return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // 执行订阅时将执行线程记为当前线程,必要时咱们能够interrupt executionThread.set(Thread.currentThread()); } }); }
但愿本身能把埋下的坑一一填完: 容错机制,metrics,断路器等等...