专栏系列文章:SpringCloud系列专栏java
系列文章:web
SpringCloud 源码系列(1)— 注册中心Eureka 之 启动初始化安全
SpringCloud 源码系列(2)— 注册中心Eureka 之 服务注册、续约markdown
SpringCloud 源码系列(3)— 注册中心Eureka 之 抓取注册表并发
SpringCloud 源码系列(4)— 注册中心Eureka 之 服务下线、故障、自我保护机制app
SpringCloud 源码系列(5)— 注册中心Eureka 之 EurekaServer集群负载均衡
SpringCloud 源码系列(6)— 注册中心Eureka 之 总结篇异步
SpringCloud 源码系列(7)— 负载均衡Ribbon 之 RestTemplateide
SpringCloud 源码系列(8)— 负载均衡Ribbon 之 核心原理函数
SpringCloud 源码系列(9)— 负载均衡Ribbon 之 核心组件与配置
SpringCloud 源码系列(10)— 负载均衡Ribbon 之 HTTP客户端组件
SpringCloud 源码系列(11)— 负载均衡Ribbon 之 重试与总结篇
SpringCloud 源码系列(12)— 服务调用Feign 之 基础使用篇
SpringCloud 源码系列(13)— 服务调用Feign 之 扫描@FeignClient注解接口
SpringCloud 源码系列(14)— 服务调用Feign 之 构建@FeignClient接口动态代理
SpringCloud 源码系列(15)— 服务调用Feign 之 结合Ribbon进行负载均衡请求
SpringCloud 源码系列(16)— 熔断器Hystrix 之 基础入门篇
SpringCloud 源码系列(17)— 熔断器Hystrix 之 获取执行订阅对象Observable
在构造 HystrixCommand 时,会去初始化 Hystrix 线程池 HystrixThreadPool
,跟进去能够发现,初始化的逻辑就在默认实现类 HystrixThreadPoolDefault
的构造方法中,HystrixThreadPoolDefault 也是线程池调度的核心组件。
其中,ThreadPoolExecutor 线程池是由 concurrencyStrategy.getThreadPool(threadPoolKey, properties)
这段代码建立的,queueSize
队列大小默认是 -1,队列 queue
是从 ThreadPoolExecutor 中获取的,因此线程池的构造还得继续看 concurrencyStrategy.getThreadPool。
class HystrixThreadPoolDefault implements HystrixThreadPool {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
// 线程池配置
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
// 并发策略
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
// 队列大小,默认 -1
this.queueSize = properties.maxQueueSize().get();
// hystrix 线程池度量器
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 建立线程池
properties);
// 默认返回的是一个使用 SynchronousQueue 队列的线程池,核心线程数和最大线程数都是默认值 10
this.threadPool = this.metrics.getThreadPool();
// SynchronousQueue
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
}
复制代码
接着看 getThreadPool
方法,这个方法就是根据配置构建线程池 ThreadPoolExecutor 的,从这个方法咱们能够获得以下信息。
getThreadFactory
返回的 ThreadFactory 能够看出,线程名称的格式是:"hystrix-{threadPoolKey}-{number}"
,这跟咱们在日志中看到的 hystrix 线程名称是一致的。getBlockingQueue
方法获取一个队列,可是默认状况下 maxQueueSize
为 -1,那么返回的队列即是 SynchronousQueue
,这是一个无容量的队列,就是说默认状况下任务不会进入队列,若是线程池线程满了将直接拒绝任务。总结一下,默认状况下,dynamicCoreSize
、dynamicMaximumSize
都是 10,maxQueueSize
等于 -1,allowMaximumSizeToDivergeFromCoreSize
默认为 false。
那么默认状况下建立的 ThreadPoolExecutor 就是备以下特性:
SynchronousQueue
队列。"hystrix-{threadPoolKey}-{number}"
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
// 名称格式:"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
// 是否容许核心线程数扩大到最大线程数,默认false
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
// 核心线程数
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
// 线程存活时间
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
// 最大队列数 maxQueueSize 默认为 -1
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
// maxQueueSize <= 0 ==> SynchronousQueue
// maxQueueSize > 0 ==> LinkedBlockingQueue(maxQueueSize)
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
// 容许扩展到最大线程数
if (allowMaximumSizeToDivergeFromCoreSize) {
// 最大线程数
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
// 不容许扩展到最大线程数,核心线程数 == 最大线程数。默认会进入这里
else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
}
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
// maxQueueSize 默认为 -1
if (maxQueueSize <= 0) {
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}
复制代码
上一篇文章中咱们已经分析出 Observable 订阅对象被调度应该是在 executeCommandWithSpecifiedIsolation(_cmd)
方法中的,入口就是 subscribeOn(Scheculer scheduler)
这个订阅。这个 scheduler 是经过上一节中初始化的 Hystrix线程池 HystrixThreadPool
来获取的,threadPool.getScheduler(Func0 func)
返回的是一个 HystrixContextScheduler
调度器。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 线程池隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//...
// 获取 run() 的 Observable
return getUserExecutionObservable(_cmd);
}
})
.doOnTerminate(new Action0() {...})
.doOnUnsubscribe(new Action0() {...})
// 将 defer 返回的 Observable 放到一个调度器中异步执行,调度器 ==> HystrixContextScheduler
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 判断是否中断线程执行,超时后中断执行
return properties.executionIsolationThreadInterruptOnTimeout().get() &&
_cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 信号量隔离
else {//...}
}
复制代码
接着看如何建立调度器的。
经过 getScheduler
方法能够了解到,首先会经过 touchConfig()
更新线程池 ThreadPoolExecutor
的配置,这说明咱们是能够在运行时动态修改线程池的参数的。可是会发现,只能修改线程池的 corePoolSize、maximumPoolSize、keepAliveTime
三个参数,因此是没法动态扩容线程池队列的。
以后建立了 HystrixContextScheduler
,进入构造方法能够看到,内部又建立了一个调度器 actualScheduler
,实际的类型是 ThreadPoolScheduler
,能够猜测最终的调度任务应该是在 ThreadPoolScheduler 中的。
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动态更改线程池配置
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
final int configuredMaximumSize = properties.maximumSize().get();
int dynamicMaximumSize = properties.actualMaximumSize();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
boolean maxTooLow = false;
if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}
if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}
threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
// actualScheduler => rx.Scheduler => ThreadPoolScheduler
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
复制代码
下面是线程池调度的流程源码,建立调度器和调度执行的过程看起来就像是层层嵌套代理。
HystrixContextScheduler
,但实际的调度器是 ThreadPoolScheduler
。createWorker()
建立 Hystrix上下文调度工做者 HystrixContextSchedulerWorker
,但又由 ThreadPoolScheduler 建立了一个代理工做者 ThreadPoolWorker
。schedule
调度方法,这个调度方法最后又用 ThreadPoolWorker 来执行调度。综上来看,真正的调度逻辑,其实就是在 HystrixContextSchedulerWorker 和 ThreadPoolWorker 的调度方法 schedule
。
HystrixContextScheduler ::
public Worker createWorker() {
// actualScheduler.createWorker() ==> ThreadPoolWorker
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
ThreadPoolScheduler ::
public Worker createWorker() {
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
HystrixContextSchedulerWorker ::
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// 队列满了就直接抛出异常
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
// 调度 worker ==> ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
ThreadPoolWorker ::
public Subscription schedule(final Action0 action) {
// 封装调度Action
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 提交任务到线程池
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 中断任务的订阅 ==> shouldInterruptThread.call() 判断是否超时中断任务
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
复制代码
为了便于理解,将源码抽象成以下的调度流程图:
在 HystrixContextSchedulerWorker
的调度方法中,先调用 threadPool.isQueueSpaceAvailable()
判断线程池队列是否还有可用空间,若是没有就会抛出拒绝异常,若是有就会用 ThreadPoolWorker
进行调度。
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// 队列满了就直接抛出异常
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
// 调度 worker ==> ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
复制代码
再来看看 isQueueSpaceAvailable
的逻辑。
默认状况下 queueSize 为 -1,若是 queueSize <= 0,就表示有可用空间,但其实是一个 SynchronousQueue 无容量队列。之因此返回 true,任务就会直接丢到线程池去执行,若是线程池工做线程满了,就由线程池自己来拒绝任务,这一点从它的注释也能够了解到。
若是设置了队列大小(配置 maxQueueSize),且队列已使用的容量小于 queueSizeRejectionThreshold(默认为5),才表示队列有可用空间。这里可能会感受有点疑惑,为什么不是判断队列是否还有剩余容量?好比判断 threadPool.getQueue().remainingCapacity() > 0
来肯定是否还有空间?
经过它的注释能够了解到,实际上是为了实现动态扩容的目的,由于队列的大小是不能动态修改的,但为了能在运行时达到队列动态扩容的目的,它用了另外一个配置 queueSizeRejectionThreshold
来控制进入队列的数量。好比 maxQueueSize 配置 100,但 queueSizeRejectionThreshold 默认为 5,因此此时队列实际上最多只会进入5个任务;运行时动态修改 queueSizeRejectionThreshold 为 20,这个时候队列最多就会进入20个任务了;所以在配置时 maxQueueSize 要大于 queueSizeRejectionThreshold
才有意义。这种设计仍是值得借鉴的。
/** * Whether the threadpool queue has space available according to the <code>queueSizeRejectionThreshold</code> settings. * * Note that the <code>queueSize</code> is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The <code>queueSizeRejectionThreshold</code> can be dynamic (up to <code>queueSize</code>), so that should * still get checked on each invocation. * <p> * If a SynchronousQueue implementation is used (<code>maxQueueSize</code> <= 0), it always returns 0 as the size so this would always return true. */
@Override
public boolean isQueueSpaceAvailable() {
// 队列大小默认为 -1
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead let the thread-pool reject or not
return true;
} else {
// 队列已经使用的容量 超过了 队列容量拒绝阈值,queueSizeRejectionThreshold 默认为 5
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
复制代码
最后,再来看 ThreadPoolWorker 的调度方法,在这个方法里面,终于看到提交到线程池的代码了,这一步就真正实现了基于线程池的隔离了。
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
// 线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 提交任务到线程池
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 中断任务的订阅 ==> shouldInterruptThread.call() 判断是否超时中断任务
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
复制代码
schedule 方法最后,向提交到线程池的 ScheduledAction 添加了一个 FutureCompleterWithConfigurableInterrupt
订阅对象,它会在取消订阅的时候取消任务的执行,好比任务超时了,这个时候就会取消任务的执行。
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
@Override
public void unsubscribe() {
executor.remove(f);
// 判断是否取消任务
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
}
复制代码
前面分析到 executeCommandAndObserve(_cmd)
这个方法有以下这段代码,executeCommandWithSpecifiedIsolation(_cmd)
返回的 Observable 主要是采起不一样的隔离策略,而后,若是启用了超时(默认启用),会增长一个 HystrixObservableTimeoutOperator
操做器,看起来就是在控制超时相关的。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//...
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
// HystrixObservableTimeoutOperator 控制超时
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 设置订阅回调
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
复制代码
再继续看 HystrixObservableTimeoutOperator
,它会返回一个 Subscriber
对原 Observable 进行一个处理。
HystrixContextRunnable
回调,就是抛出 HystrixTimeoutException 这个异常。TimerListener
时间监听器,这个监听器的间隔时间是 getIntervalTimeInMilliseconds()
返回的时间,就是命令执行超时时间。isCommandTimedOut
的状态,若是超时后 isCommandTimedOut 仍是 NOT_EXECUTED
,就会更新到 TIMED_OUT
,并取消任务的执行,而后发出超时的异常。TimerListener
添加到了 HystrixTimer
中,因此时间控制的核心逻辑应该是在 HystrixTimer 中。isCommandTimedOut
对应的 TimedOutStatus 有三种状态:NOT_EXECUTED, COMPLETED, TIMED_OUT
。
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
// 超时回调,抛出 HystrixTimeoutException 异常
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
// 超时监听器
TimerListener listener = new TimerListener() {
// tick 在每次间隔时间会调用一次
@Override
public void tick() {
// 超时后更新状态
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 通知超时失败
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// 中止本来的请求
s.unsubscribe();
// 抛出超时异常
timeoutRunnable.run();
}
}
// 返回间隔时间,默认就是命令执行超时时间
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
// 添加监听器,开始计算超时
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// 设置到原命令中
originalCommand.timeoutTimer.set(tl);
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
tl.clear(); // 清除 TimerListener
child.onCompleted();
}
}
//...
private boolean isNotTimedOut() {
// 任务执行完成,更新 isCommandTimedOut 状态
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
s.add(parent);
return parent;
}
}
复制代码
接着看 HystrixTimer 的 addTimerListener
方法,这个方法就比较好理解了,就是初始化任务调度器,而后调度 TimerListener 的执行,调度器的延迟执行时间和间隔周期都是 TimerListener 返回的命令超时时间。
总结一下 TimerListener 和 HystrixTimer 整合起来,其实就是检测任务是否超时的,任务执行超时后就会抛出超时异常,而后取消任务的执行,后面应该就会进入降级的逻辑了。
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
// 初始化 executor 调度器:ScheduledThreadPoolExecutor
startThreadIfNeeded();
Runnable r = new Runnable() {
@Override
public void run() {
try {
// 触发监听器
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
// 超时时间默认1000毫秒,每隔1000毫秒调度一次
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r,
listener.getIntervalTimeInMilliseconds(), // 延迟多久执行
listener.getIntervalTimeInMilliseconds(), // 执行周期
TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
复制代码
在 Hystrix 基础篇中,咱们提到了 Hystrix 执行错误的六种类型,以下图所示,除了 BAD_REQUEST,都会降级进入回调方法中。
在 AbstractCommand
中,能够看到有以下6个处理错误的方法,分别针对 Hystrix 的6中错误类型。在分析 HystrixCommand 的执行流程中,咱们知道有不少的回调方法,其中错误回调确定就会调用下面其中的一个方法来处理错误,这个就不在看了。
须要注意的是,除了 handleBadRequestByEmittingError()
这个处理 BAD_REQUEST 错误的方法,其他5个方法最后都会调用 getFallbackOrThrowException
方法来获取回调方法的Observable,这样就合上图中对应起来了。说明错误回调的封装就是 getFallbackOrThrowException
。
// 信号量拒绝回调
private Observable<R> handleSemaphoreRejectionViaFallback() {
Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
executionResult = executionResult.setExecutionException(semaphoreRejectionException);
eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}
// 短路回调
private Observable<R> handleShortCircuitViaFallback() {
eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
executionResult = executionResult.setExecutionException(shortCircuitException);
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
}
// 线程池拒绝回调
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
threadPool.markThreadRejection();
return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
"could not be queued for execution", underlying);
}
// 超时回调
private Observable<R> handleTimeoutViaFallback() {
return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
"timed-out", new TimeoutException());
}
// 失败回调
private Observable<R> handleFailureViaFallback(Exception underlying) {
//...
eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}
// BAD_REQUEST 处理
private Observable<R> handleBadRequestByEmittingError(Exception underlying) {
Exception toEmit = underlying;
//...
return Observable.error(toEmit);
}
复制代码
接着来看回调的处理方法 getFallbackOrThrowException
,能够获得以下信息:
getFallbackSemaphore()
获取的信号量,默认返回的实际类型是 TryableSemaphore
,限流数默认是 10。而后会经过这个信号量获取一个许可证后才去调用回调方法。getFallbackObservable()
获取回调 Observable,若是用户重载了 getFallback()
方法,就返回 getFallback() 的 Observable;若是没有重载,将抛出 "No fallback available."
的异常。private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 启用回调
if (properties.fallbackEnabled().get()) {
// 设置当前线程
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
// 回调通知
final Action1<R> markFallbackEmit = new Action1<R>() {...};
// 回调完成通知
final Action0 markFallbackCompleted = new Action0() {...};
// 回调错误后处理
final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {...};
// 回调信号量 => TryableSemaphore
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// 释放信号量许可证
final Action0 singleSemaphoreRelease = new Action0() {...};
// 获取回调 Observable
Observable<R> fallbackExecutionChain;
// 获取信号量许可证
if (fallbackSemaphore.tryAcquire()) {
try {
// 用户是否认义了回调方法,即重写 getFallback()
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
fallbackExecutionChain = getFallbackObservable();
} else {
// 抛出异常:"No fallback available."
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
fallbackExecutionChain = Observable.error(ex);
}
// 执行回调
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
return handleFallbackRejectionByEmittingError();
}
} else {
return handleFallbackDisabledByEmittingError(originalException, failureType, message);
}
}
复制代码
下面用一张图总结下Hystrix任务提交到线程池执行的流程。
在 AbstractCommand 构造方法中,初始化了断路器 HystrixCircuitBreaker
, 若是启用了断路器(circuitBreakerEnabled
),就从 HystrixCircuitBreaker.Factory 中获取一个 断路器,默认实现类是 HystrixCircuitBreakerImpl
。若是未启用断路器,默认实现类则是 NoOpCircuitBreaker
,就是什么都不控制的断路器。
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker,
this.commandGroup, this.commandKey, this.properties, this.metrics);
复制代码
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// 从 Factory 中获取默认的 断路器
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
// 禁用断路器就返回什么都不操做的实现类
return new NoOpCircuitBreaker();
}
}
复制代码
HystrixCircuitBreakerImpl
的构造方法主要须要 HystrixCommandProperties
和 HystrixCommandMetrics
,可想而知,断路器须要统计 hystrix 请求的错误、超时、拒绝、成功等次数,而后决定是否打开断路器,那么这些统计的数据来源就是 HystrixCommandMetrics
.
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
复制代码
以前分析 applyHystrixSemantics(_cmd)
时,咱们看到了断路器的使用,applyHystrixSemantics 是应用Hystrix的核心入口,会先经过断路器判断是否容许放行请求,断路器不容许则会走断路器拒绝降级的方法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 断路器是否容许请求
if (circuitBreaker.allowRequest()) {
//...
} else {
// 短路 => 降级
return handleShortCircuitViaFallback();
}
}
复制代码
接着看 allowRequest()
方法:
public boolean allowRequest() {
// 断路器手动强制打开
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 断路器手动强制关闭
if (properties.circuitBreakerForceClosed().get()) {
// 调度一次,进行统计,决定是否打开断路器
isOpen();
return true;
}
// 断路器未打开 或者 运行打开
return !isOpen() || allowSingleTest();
}
复制代码
isOpen()
方法判断断路器是否打开:
circuitOpen
是否为 true,是则表示断路器打开了。也就是说断路器的状态是由 circuitOpen
这个 AtomicBoolean 来控制的。HystrixCommandMetrics
中获取统计的计数 HealthCounts
,后面的逻辑就是根据 HealthCounts 计算是否要打开断路器。总的请求数 < 断路器请求阈值(默认20)
就不会打开断路器。错误百分比率 < 断路器错误比率阈值(默认50%)
就不会打开断路器。Hystrix 断路器的设计很好的利用了原子类的特性,circuitOpen 的更新采用基于 CAS 指令的 compareAndSet
,高并发状况下也能安全更新,更新成功再设置断路器打开时间,更新失败则不更新。
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
public boolean isOpen() {
// circuitOpen 是否打开
if (circuitOpen.get()) {
return true;
}
// 获取请求计数
HealthCounts health = metrics.getHealthCounts();
// 小于断路器请求的阈值,默认为20,至少超过20个请求后才会开启断路器
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 错误百分比
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 打开断路器
if (circuitOpen.compareAndSet(false, true)) {
// 设置断路器打开的时间
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
复制代码
若是断路器已经打开了,这时断路器会休眠一段时间后,放一个请求去测试被调用方是否已经恢复,这就是半开状态。
看 allowSingleTest()
这个方法,是否进入半开状态的依据是:
OPEN -> HALF-OPEN
。public boolean allowSingleTest() {
// 断路器打开时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 断路器打开 且 当前时间 > (断路器打开时间 + 断路器休眠窗口时间(默认5000毫秒))
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 断路器打开时间设置为当前时间
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
复制代码
那断路器打开了什么时候恢复呢?跟踪一下 circuitOpen
就知道了,HystrixCircuitBreaker
有个方法 markSuccess()
会去在断路器打开时关闭断路器。
public void markSuccess() {
if (circuitOpen.get()) {
// 关闭断路器
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}
复制代码
在 executeCommandAndObserve(_cmd)
这个方法中,能够看到有两个回调函数 markEmits
、markOnCompleted
,他们会在请求执行成功和完成后被回调执行,回调函数里调用了 circuitBreaker.markSuccess();
来告诉断路器请求成功了,而后断路器就会进入关闭状态,HALF -> CLOSE
。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 标记命令已经执行
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (commandIsScalar()) {
//...
// 告诉断路器请求成功
circuitBreaker.markSuccess();
}
}
};
// 标记命令执行结束
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
//...
// 告诉断路器请求成功
circuitBreaker.markSuccess();
}
}
};
// 处理回调
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {...};
// 设置当前线程
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
// 超时会由 HystrixObservableTimeoutOperator 处理,抛出 HystrixTimeoutException 超时异常
execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 设置订阅回调
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
复制代码
最后,仍是用一张图总结下断路器的工做机制。