专栏系列文章:SpringCloud系列专栏java
系列文章:web
SpringCloud 源码系列(1)— 注册中心Eureka 之 启动初始化编程
SpringCloud 源码系列(2)— 注册中心Eureka 之 服务注册、续约缓存
SpringCloud 源码系列(3)— 注册中心Eureka 之 抓取注册表markdown
SpringCloud 源码系列(4)— 注册中心Eureka 之 服务下线、故障、自我保护机制并发
SpringCloud 源码系列(5)— 注册中心Eureka 之 EurekaServer集群app
SpringCloud 源码系列(6)— 注册中心Eureka 之 总结篇负载均衡
SpringCloud 源码系列(7)— 负载均衡Ribbon 之 RestTemplate异步
SpringCloud 源码系列(8)— 负载均衡Ribbon 之 核心原理ide
SpringCloud 源码系列(9)— 负载均衡Ribbon 之 核心组件与配置
SpringCloud 源码系列(10)— 负载均衡Ribbon 之 HTTP客户端组件
SpringCloud 源码系列(11)— 负载均衡Ribbon 之 重试与总结篇
SpringCloud 源码系列(12)— 服务调用Feign 之 基础使用篇
SpringCloud 源码系列(13)— 服务调用Feign 之 扫描@FeignClient注解接口
SpringCloud 源码系列(14)— 服务调用Feign 之 构建@FeignClient接口动态代理
SpringCloud 源码系列(15)— 服务调用Feign 之 结合Ribbon进行负载均衡请求
SpringCloud 源码系列(16)— 熔断器Hystrix 之 基础入门篇
这一章咱们从 HystrixCommand 的构造以及 execute() 执行为入口,一步步分析下 Hystrix 如何封装业务逻辑、线程池隔离模式、熔断降级的核心原理。
须要注意的是,Hystrix 源码大量使用了 rxjava 响应式编程,源码中充斥着大量的回调,以及 Observable 层层嵌套,源码运行流程并非线性的,所以在分析源码的过程当中,我会只展现核心的一些源码,便于咱们梳理出 Hystrix 的设计便可。
HystrixCommand 是一个抽象类,又继承自抽象类 AbstractCommand,核心的逻辑都在 AbstractCommand
中,HystrixCommand 相对来讲比较简单,主要就是重载了几个方法,因此咱们先看下 HystrixCommand 组件的结构。
看 HystrixCommand 的类结构图,注意看左边的标识,能够得出以下信息:
run()
方法是一个抽象方法,须要子类实现,也就是咱们的业务逻辑都封装到 run() 方法中execute()
、queue()
是 public 的方法,用于执行命令getExecutionObservable()
、getFallbackObservable()
是 final 修饰的,不可被重载,见名知意,getExecutionObservable() 是父类用于获取执行命令 Observable 的,getFallbackObservable() 是父类用于获取回调方法 Observable 的。getFallback()
、getFallbackMethodName()
都是 protected 的,能够被子类重载。getExecutionObservable()
是实现的父类 AbstractCommand 的抽象方法,经过名称或源码能够知道,这个方法就是获取 run() 方法订阅对象 Observable 的,调用命令时最终是必定要走到这个方法的。
@Override
final protected Observable<R> getExecutionObservable() {
// defer:对象被订阅时才触发 call() 方法的调用
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 订阅此 Observable 对象将触发 run() 方法的执行
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// 订阅时将当前线程保存起来
executionThread.set(Thread.currentThread());
}
});
}
复制代码
一样的,getFallbackObservable()
则是用于获取回调方法的订阅对象 Observable。
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 返回一个 执行回调方法 的订阅对象
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
复制代码
HystrixCommand 一共有四个方法能够调用来执行命令,分别是:execute()、queue()、observe()、toObservable()
,看源码能够了解到,前面三个方法最终都是调用 toObservable() 方法来实现的。observe()、toObservable() 都是返回一个 Observable 对象,调用 .toBlocking()
方法就会触发订阅对象的执行,而 toFuture()
返回的 Future 就能够异步执行,再调用 Future 的 get()
方法就能够同步阻塞等待执行结果,所以最终就实现了四个方法的不一样特性。
public R execute() {
return queue().get();
}
public Future<R> queue() {
// 全部的调用最终都到了 toObservable()
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 代理,在 delegate 执行异常后作一些处理
final Future<R> f = new Future<R>() {
//...
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
//...
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
//....
}
}
return f;
}
复制代码
HystrixCommand 的构造初始化最终都是在父类 AbstractCommand 中参数最齐全的这个构造方法,能够看到这个构造方法有不少参数,但只有第一个参数 HystrixCommandGroupKey 是必输的,即分组名称,其他的参数为空时将使用默认值。
HystrixCommand 的初始化过程比较简单,主要就是初始化了 command 一些配置以及组件。
HystrixCommandProperties
这个类。hystrix-{groupKey}-{number}
。// 除了 group 参数必须外,其它都是非必须参数
protected AbstractCommand(HystrixCommandGroupKey group, // 组名 HystrixCommandKey key, // 命令名 HystrixThreadPoolKey threadPoolKey, // 线程池名 HystrixCircuitBreaker circuitBreaker, // 断路器 HystrixThreadPool threadPool, // Hystrix 线程池 HystrixCommandProperties.Setter commandPropertiesDefaults, // 配置 HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, // 线程池配置 HystrixCommandMetrics metrics, // 度量器 TryableSemaphore fallbackSemaphore, // 信号量回调 TryableSemaphore executionSemaphore, // 信号量限流器 HystrixPropertiesStrategy propertiesStrategy, // Hystrix配置策略组件 HystrixCommandExecutionHook executionHook) { // Hook 跟踪 Hystrix 命令执行
// 命令分组
this.commandGroup = initGroupKey(group);
// key 为 null 时取类名:getClass().getSimpleName()
this.commandKey = initCommandKey(key, getClass());
// Command 配置,默认为 commandPropertiesDefaults
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 先使用配置的 poolKeyOverride,不然 threadPoolKey 为空 则使用 groupKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 度量统计组件:HystrixCommandMetrics
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
// 断路器:HystrixCircuitBreaker
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// Hystrix线程池:HystrixThreadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
// 时间通知器:HystrixEventNotifier
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
// 并发策略:HystrixConcurrencyStrategy
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
// Hook:HystrixCommandExecutionHook
this.executionHook = initExecutionHook(executionHook);
// 请求缓存:HystrixRequestCache
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
// 请求日志记录:HystrixRequestLog,默认为 null
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
// 信号量回调覆盖
this.fallbackSemaphoreOverride = fallbackSemaphore;
// 信号量覆盖
this.executionSemaphoreOverride = executionSemaphore;
}
复制代码
以断路器 HystrixCircuitBreaker 的初始化为例,看看这些组件是如何初始化的。
初始化步骤基本都是相似的,若是 AbstractCommand 构造参数传入的组件为 null,就会初始化默认组件。每一个组件会有一个内部类 Factory
,Factory 提供了一个 getInstance
方法来获取组件。Factory 会用一个 ConcurrentHashMap 来缓存不一样的 command 对应的组件,避免重复建立,getInstance() 在获取组件时,先从本地缓存中获取,不存在则建立默认的组件,并放入本地缓存中。
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// 从 Factory 中获取默认的 断路器
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
// 禁用断路器就返回什么都不操做的实现类
return new NoOpCircuitBreaker();
}
}
复制代码
HystrixCircuitBreaker.Factory:
public static class Factory {
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// 先从本地缓存中找是否已经建立了同名的断路器组件
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// 尚未建立过就建立一个默认的
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
return circuitBreakersByCommand.get(key.name());
} else {
return cbForCommand;
}
}
}
复制代码
AbstractCommand 的 toObservable()
方法返回命令执行的最终订阅对象,但它内部对 run()
方法的执行封装可谓是层层嵌套,很是复杂,所以咱们抓大放小,下面先从总体上来看看 toObservable 返回订阅对象 Observable 的流程,一直到最后找到是在哪里执行 run()
方法的。
大体流程以下:
定义命令执行结束后的回调动做 Action0 => terminateCommandCleanup。
定义命令取消执行后的回调动做 Action0 => unsubscribeCommandCleanup。
定义应用 hystrix 的核心语意 Func0 => applyHystrixSemantics,之因此展现这个回调方法的实现,是由于这个回调会去封装 run() 方法。
定义一个转换Hook Func1 => wrapWithAllOnNextHooks。
定义Hook完成后的回调 => fireOnCompletedHook。
最后一步才是在建立 Observable 订阅对象,看下这个订阅对象主要作什么:
applyHystrixSemantics
返回的订阅对象toObservable() 总结一下,其实最核心的就两个地方:
applyHystrixSemantics
才是封装核心业务逻辑的地方;public Observable<R> toObservable() {
// _cmd => 当前命令对象
final AbstractCommand<R> _cmd = this;
// 命令执行结束后的一些动做
final Action0 terminateCommandCleanup = new Action0() {...};
// 命令取消执行的一些动做
final Action0 unsubscribeCommandCleanup = new Action0() {...};
// 应用 Hystrix 的核心语意部分,hystrix 执行入口
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// 执行 hystrix 请求
return applyHystrixSemantics(_cmd);
}
};
// 对原始命令作一些转换
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...};
// 执行成功后的动做
final Action0 fireOnCompletedHook = new Action0() {...};
// defer: 不会当即执行,调用 toBlocking() 后才执行 call() 方法
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 设置状态为 OBSERVABLE_CHAIN_CREATED,若是初始状态不是 NOT_STARTED,将抛出异常
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
throw new HystrixRuntimeException(...);
}
// 设置命令开始时间(注意是在真正订阅时才会设置)
commandStartTimestamp = System.currentTimeMillis();
// 是否开启请求日志,默认为 currentRequestLog 为 null
if (properties.requestLogEnabled().get() && currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
// 是否开启请求缓存,且 cacheKey 不为空
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 要开启请求缓存须要重载 getCacheKey() 方法
final String cacheKey = getCacheKey();
// 首先从缓存中取
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
// 直接返回缓存中的数据
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 订阅 applyHystrixSemantics 返回的订阅对象
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 开启请求缓存的状况下,再次订阅 hystrixObservable,在执行结束后将请求结果缓存起来
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
// 订阅对象放入缓存中
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
// 缓存已存在则取消订阅,返回缓存中的内容
if (fromCache != null) {
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
afterCache = toCache.toObservable();
}
}
// 未开启缓存就是本来的 hystrixObservable
else {
afterCache = hystrixObservable;
}
// 返回订阅对象
return afterCache
.doOnTerminate(terminateCommandCleanup) // 命令结束后执行
.doOnUnsubscribe(unsubscribeCommandCleanup) // 命令取消后执行
.doOnCompleted(fireOnCompletedHook); // 命令完成后执行
}
});
}
复制代码
前面看到 applyHystrixSemantics(_cmd)
是封装 Hystrix 命令的语意,但看下来发现,其实还没看到 run() 的封装,下面看下它主要作了哪些事情。
首先发出一个Hook,告知命令开始执行了。
而后用断路器判断是否容许请求,若是断路器拒绝了,好比断路器状态为打开状态,就直接走降级。
若是断路器容许请求,获取一个信号量 TryableSemaphore,若是是信号量模式返回的是 TryableSemaphoreActual;线程池模式返回的是 TryableSemaphoreNoOp,什么都不作,直接放行。
定义了在请求结束后是否信号量许可证的动做 Action0 => singleSemaphoreRelease。
定义了抛出异常后发出通知的动做 Action1 => markExceptionThrown。
以后获取信号量许可证,获取失败就会进入信号量拒绝降级
获取到信号量许可证后,就设置开始执行的时间
最后,经过 executeCommandAndObserve(_cmd)
方法再次订阅,并设置了错误回调、结束回调、取消执行的回调
总结一下,applyHystrixSemantics(_cmd)
最核心的应该就是应用断路器
或信号量
来对请求进行限流。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 命令开始执行
executionHook.onStart(_cmd);
// 断路器是否容许请求
if (circuitBreaker.allowRequest()) {
// 获取 Semaphore,信号量模式返回 TryableSemaphoreActual,线程池模式返回 TryableSemaphoreNoOp
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// 请求结束后释放 信号量许可证
final Action0 singleSemaphoreRelease = new Action0() {...};
// 抛出异常后发出通知
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {...};
// 获取 信号量许可证,线程池模式始终返回 true
if (executionSemaphore.tryAcquire()) {
try {
// 设置执行开始时间,这应该才是命令真正开始执行的时间
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 执行命令并订阅
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown) // 错误后执行
.doOnTerminate(singleSemaphoreRelease) // 结束后执行
.doOnUnsubscribe(singleSemaphoreRelease); // 取消执行后执行
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 信号量拒绝 => 降级
return handleSemaphoreRejectionViaFallback();
}
} else {
// 短路 => 降级
return handleShortCircuitViaFallback();
}
}
复制代码
applyHystrixSemantics(_cmd)
中又调用了 executeCommandAndObserve(_cmd)
获取订阅对象,这个方法总体看下来也比较简单,下面来看下。
首先也是建立了几个回调对象
核心的是在最后几步,调用 executeCommandWithSpecifiedIsolation(_cmd)
获取一个订阅对象,看方法名称应该就是应用Hystrix的隔离策略。
若是启用了执行超时,订阅对象还会增长一个超时处理器 HystrixObservableTimeoutOperator
,进去能够发现,这个处理器建立了一个 TimerListener
去更改 isCommandTimedOut
的状态为超时,这块就和前面对应上了。超时相关的咱们后面再分析。
仍是总结下,executeCommandAndObserve(_cmd)
最核心的应该就是,若是启用了超时,就给订阅对象增长一个超时处理器来响应超时。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 标记命令已经执行
final Action1<R> markEmits = new Action1<R>() {...};
// 标记命令执行结束
final Action0 markOnCompleted = new Action0() {...};
// 处理回调
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {...};
// 设置当前线程
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
Observable<R> execution;
// 建立 run() 方法的 Observable,根据配置的隔离策略继续封装,线程池隔离模式会放到线程池中去调度,信号量模式则直接返回
if (properties.executionTimeoutEnabled().get()) {
// 超时会由 HystrixObservableTimeoutOperator 处理,抛出 HystrixTimeoutException 超时异常
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 设置订阅回调
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
复制代码
接着来看 executeCommandWithSpecifiedIsolation(_cmd)
方法,看方法名称就知道是将 command 放到配置的隔离策略下去执行。能够看到方法内首先一个 if...else...分红线程池隔离和信号量隔离。
线程池隔离模式:
首先将命令的状态由 OBSERVABLE_CHAIN_CREATED 更改成 USER_CODE_EXECUTED,表示用户代码开始执行。
经过 isCommandTimedOut
判断命令是否超时,超时就抛出异常,还未执行就超时。这种状况是有可能发生的,好比线程池已经满了,command 在队列中等待,等待的过程当中超时了。先记住 isCommandTimedOut
这个东西,它是在其它地方进行设置的。
Hystrix.startCurrentThreadExecutingCommand(getCommandKey())
看起来是开始执行命令,但它内部其实只是将 HystrixCommandKey 放到了一个栈的栈顶,返回的 endCurrentThreadExecutingCommand 则是在命令执行结束后 将 HystrixCommandKey 从栈顶弹出。目前还不清楚有什么做用。
经过 getUserExecutionObservable(_cmd)
获取用户执行的订阅对象,这个方法才是最终封装 run() 方法返回的订阅对象。
线程池隔离和信号量隔离最大的区别在最后一步,线程池隔离会有个订阅 subscribeOn(Scheduler scheduler)
,这个 scheduler 是调用 threadPool.getScheduler(Func0 func)
获取的一个 rx.Scheduler
对象,实际类型是 HystrixContextScheduler
。能够猜想一下就是将 call() 返回的 Observable 对象扔到 Scheduler 里进行异步调度,因此这里将是线程池隔离
的入口。
信号量隔离相对来讲就简单不少,最后一步一样经过 getUserExecutionObservable(_cmd)
获取 run() 方法的订阅对象。
总结一下,这个方法最核心的就是返回 run()
的订阅对象,并根据命令的隔离策略进行资源隔离。线程池隔离时会订阅到一个 Scheculer 中进行调度执行,能够猜测到内部最终会扔到一个线程池中去执行。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 线程池隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 状态由 OBSERVABLE_CHAIN_CREATED 更改成 USER_CODE_EXECUTED,表示 run() 方法的代码开始执行
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException(...));
}
// 统计,命令开始执行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// 判断命令是否执行超时,isCommandTimedOut 在 TimerListener 中更新
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// 增长执行线程数
HystrixCounters.incrementGlobalConcurrentThreads();
// 统计线程开始执行
threadPool.markThreadExecution();
// 将命令Key push 到一个栈的栈顶
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
// 发出一些Hook通知...
// 获取 run() 的 Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnTerminate(new Action0() {
// 命令执行结束后更新线程状态...
}).doOnUnsubscribe(new Action0() {
// 命令取消执行后更新线程状态...
})
// 将 defer 返回的 Observable 放到一个调度器中异步执行,调度器 ==> HystrixContextScheduler
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 判断是否中断线程执行,超时后中断执行
return properties.executionIsolationThreadInterruptOnTimeout().get() &&
_cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 信号量隔离
else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 状态由 OBSERVABLE_CHAIN_CREATED 更改成 USER_CODE_EXECUTED,表示 run() 方法的代码开始执行
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException(...));
}
// 统计:信号量默认命令开始执行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// 将命令Key push 到一个栈的栈顶
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
// 发出一些Hook通知...
// 获取 run() 的 Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
复制代码
最终,终于找到封装 run() 方法的地方了,getUserExecutionObservable(_cmd)
方法比较简单,就是调用子类 HystrixCommand 实现的 getExecutionObservable()
来获得执行 run() 的订阅对象,也就是咱们自定义的业务代码。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
// 封装了重载的 run() 方法
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
// Hystrix 执行回调处理
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
复制代码
HystrixCommand
中的 getExecutionObservable() 方法:
final protected Observable<R> getExecutionObservable() {
// defer:对象被订阅时才触发 call() 方法的调用
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 订阅此 Observable 对象将触发 run() 方法的执行
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
})
}
复制代码
至此,获取 run()
业务逻辑订阅对象的整个流程基本就分析清楚了,toObservable()
是获取订阅对象的入口,为了加上 hystrix 的各类特性,又嵌了多层建立了最终的 Observable 对象,基本上是每一个子方法就为订阅对象增长一个特性。
toObservable()
:获取订阅对象的入口。applyHystrixSemantics(_cmd)
:应用 Hystrix 断路器或信号量,断路器打开或没法获取信号量就直接拒绝走降级。executeCommandAndObserve(_cmd)
:若是启用了超时,为订阅对象增长超时处理器、executeCommandWithSpecifiedIsolation
:根据配置的隔离策略,返回不一样的订阅对象;线程池隔离就会将订阅对象扔到一个 HystrixContextScheduler
中去调度执行。getUserExecutionObservable(_cmd)
:返回真正封装了 run() 业务逻辑的订阅对象。