Hystrix是一个熔断中间件,可以实现fast-fail并走备用方案。Hystrix基于滑动窗口断定服务失败占比选择性熔断。滑动窗口的实现方案有不少种,指标计数也有不少种实现常见的就是AtomicInteger进行原子增减维护计数,具体的方案就不探讨了。java
Hystrix是基于Rxjava去实现的,那么如何利用RxJava实现指标的汇聚和滑动窗口实现呢?固然本篇不是做为教程去介绍RxJava的使用姿式,本篇文章主要解说Hystrix是什么一个思路完成这项功能。git
看HystrixCommand执行的主入口github
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { .......省略干扰代码........... handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { .......省略干扰代码........... handleCommandEnd(true); //user code did run } } }; .......省略干扰代码........... return Observable.defer(new Func0<Observable<R>>() { .......省略干扰代码........... return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } });
咱们的主入口Observable当doOnTerminate
和 doOnUnsubscribe
的时候触发 handleCommandEnd
方法,从字面意思就是当command执行结束处理一些事情。算法
private void handleCommandEnd(boolean commandExecutionStarted) { ........省略干扰代码.......... executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } ........省略干扰代码.......... }
注意看 metrics.markCommandDone
,调用了HystrixCommandMetrics
的markCommandDone方法,把一个executionResult传入了进来。ExecutionResult
这是个什么鬼呢?
咱们截取部分代码浏览下segmentfault
public class ExecutionResult { private final EventCounts eventCounts; private final Exception failedExecutionException; private final Exception executionException; private final long startTimestamp; private final int executionLatency; //time spent in run() method private final int userThreadLatency; //time elapsed between caller thread submitting request and response being visible to it private final boolean executionOccurred; private final boolean isExecutedInThread; private final HystrixCollapserKey collapserKey; private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values(); private static final int NUM_EVENT_TYPES = ALL_EVENT_TYPES.length; private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES); private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);
以你们聪慧的头脑应该可以猜想到这个类就是当前HystrixCommand的 执行结果记录,只不过这个结果不只仅是结果,也包含了各类状态以及出现的异常。它的身影在Hystrix执行原理里讲的各Observable里出现,跟着HystrixCommand整个生命周期。数组
回到上面讲,当时command执行完毕后,调用了HystrixCommandMetrics
的markCommandDone方法app
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) { HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey); if (executionStarted) { concurrentExecutionCount.decrementAndGet(); } }
最终调用量HystrixThreadEventStream. executionDone方法的HystrixThreadEventStream是ThreadLocal方式,和当前线程绑定ide
//HystrixThreadEventStream.threadLocalStreams private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() { @Override protected HystrixThreadEventStream initialValue() { return new HystrixThreadEventStream(Thread.currentThread()); } };
executionDone代码以下函数
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); writeOnlyCommandCompletionSubject.onNext(event); }
这里根据 executionResult, threadpoolkey,comandKey,生成 了一个HystrixCommandCompletion
而后经过writeOnlyCommandCompletionSubject写入,writeOnlyCommandCompletionSubject整个东西,咱们等会再看。如今思考下HystrixCommandCompletion
是什么?HystrixCommandCompletion
包含了 ExecutionResult
和HystrixRequestContext
,它是一种HystrixEvent
,标识着command执行完成的一个事件,该事件是当前这个点HystrixCommand的请求信息,执行结果,状态等数据的载体。
从上面类图能够看到不只仅HystrixCommandCompletion
一种还有其它的Event,这里就不一一介绍了。post
当writeOnlyCommandCompletionSubject
onNext的时候会触发 writeCommandCompletionsToShardedStreams
执行里面的call()方法。
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() { @Override public void call(HystrixCommandCompletion commandCompletion) { HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); commandStream.write(commandCompletion); if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) { HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey()); threadPoolStream.write(commandCompletion); } } };
这个方法的意思是,会把HystrixCommandCompletion
经过HystrixCommandCompletionStream
写入,若是当前command使用的是线程池隔离策略的话 会经过 HystrixThreadPoolCompletionStream
再写一遍。HystrixCommandCompletionStream
和HystrixThreadPoolCompletionStream
他们两个概念相似,咱们拿着前者解释,这个是个什么东西。HystrixCommandCompletionStream
以commandKey为key,维护在内存中,调用它的write的方法实则是调用内部属性 writeOnlySubject的方法,writeOnlySubject是一个Subject
(RxJava的东西),经过SerializedSubject保证其写入的顺序性,调用其share()
方法得到一个Observable也就是readOnlyStream,让外界可以读这个Subject
的数据。总结下Subject
是链接两个Observable之间的桥梁,它有两个泛型元素标识着进出数据类型,所有都是HystrixCommandCompletion
类型
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create()); this.readOnlyStream = writeOnlySubject.share(); }
咱们从源头开始梳理,明白了这个HystrixCommandCompletion
数据流是如何写入的(其它类型的的思路一致,就不一一解释了),那它是如何被搜集起来呢?
追溯至AbstractCommand初始化
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { ........省略代码........ this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); ........省略代码........ }
初始化command指标
HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) { super(null); this.key = key; this.group = commandGroup; this.threadPoolKey = threadPoolKey; this.properties = properties; healthCountsStream = HealthCountsStream.getInstance(key, properties); rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties); cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties); rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties); rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties); rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); }
有不少各类 XXXStream.getInstance(),这些Stream就是针对各种用途进行指标搜集,统计的具体实现,下面能够看下他们的UML类图
BucketedCounterStream
实现了基本的桶计数器,BucketedCumulativeCounterStream
基于父类实现了累计计数,BucketedRollingCounterStream
基于父类实现了滑动窗口计数。二者的子类就是对特定指标的具体实现。
接下来分两块累计计数和滑动窗口计数,挑选其对应的CumulativeCommandEventCounterStream和HealthCountsStream进行详细说明。
protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) .flatMap(reduceBucketToSummary) .startWith(emptyEventCountsToStart); } }); }
这里父类的构造方法主要成三个部分分别是
I. reduceBucketToSummary 每一个桶如何计算聚合的数据
appendRawEventToBucket的实现由其子类决定,不过大同小异,咱们自行拔下代码看下HealthCountsStream, 能够看到他用的是HystrixCommandMetrics.appendEventToBucket
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() { @Override public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) { ExecutionResult.EventCounts eventCounts = execution.getEventCounts(); for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here default: initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType); break; } } return initialCountArray; } }; }
这个方法就是将一个桶时长内的数据进行累计计数相加。initialCountArray
能够看出一个桶内前面的n个数据流的计算结果,数组的下标就是HystrixEventType
枚举里事件的下标值。
II. emptyEventCountsToStart 第一个桶的定义,装逼点叫创世桶
III. window窗口的定义,这里第一个参数就是每一个桶的时长,第二个参数时间的单位。利用RxJava的window帮咱们作聚合数据。
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
Bucket 时长如何计算
每一个桶的时长如何得出的?这个也是基于咱们的配置得出,拿HealthCountsStream举例子。metrics.rollingStats.timeInMilliseconds
滑动窗口时长 默认10000msmetrics.healthSnapshot.intervalInMilliseconds
检测健康状态的时间片,默认500ms 在这里对应一个bucket的时长
滑动窗口内桶的个数 = 滑动窗口时长 / bucket时长
而 CumulativeCommandEventCounterStreammetrics.rollingStats.timeInMilliseconds
滑动窗口时长 默认10000msmetrics.rollingStats.numBuckets
滑动窗口要切的桶个数
bucket时长 = 滑动窗口时长 / 桶个数
不一样职能的 XXXStream对应的算法和对应的配置也不同,不过都一个套路,就不一一去展现了。
inputEventStream
inputEventStream 能够认为是窗口采集的数据流,这个数据流由其子类去传递,大体看了下
//HealthCountsStream private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs, Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) { super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator); } //RollingThreadPoolEventCounterStream private RollingThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey, int numCounterBuckets, int counterBucketSizeInMs, Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion, Func2<long[], long[], long[]> reduceBucket) { super(HystrixThreadPoolCompletionStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket); }
咱们发现这个 inputEventStream,其实就是 HystrixCommandCompletionStream、HystrixThreadPoolCompletionStream或者其它的,咱们挑其中HystrixCommandCompletionStream看下,这个就是上面第二部分指标数据上传里讲的写数据那个stream,inputEventStream.observe()
也就是 HystrixCommandCompletionStream的 readOnlyStream
,Subject
的只读Observable。(这里若是没明白能够回到第二点看下结尾的部分)
先看下累计计数器的父类BucketedCumulativeCounterStream
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs, Func2<Bucket, Event, Bucket> reduceCommandCompletion, Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion); this.sourceStream = bucketedStream .scan(getEmptyOutputValue(), reduceBucket) .skip(numBuckets) ........省略代码........ }
bucketedStream就是3.1
里的数据汇聚后的一个一个桶流,这里执行了scan方法,scan方法的意思就是会将当前窗口内已经提交的数据流进行按照顺序进行遍历并执行指定的function逻辑,scan里有两个参数第一个参数表示上一次执行function的结果,第二个参数就是每次遍历要执行的function,scan完毕后skip numBuckets
个bucket,能够认为丢弃掉已经计算过的bucket。
scan里的function是如何实现呢?它也是实现累计计数的关键,由子类实现,本小节也就是CumulativeCommandEventCounterStream来实现
CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);
发现调用的是 HystrixCommandMetrics.bucketAggregator,咱们看下其函数体
public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() { @Override public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) { for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) { cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()]; } break; default: cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()]; break; } } return cumulativeEvents; } };
call() 方法有两个参数第一个参数指的以前的计算结果,第二个参数指的当前桶内的计数,方法体不难理解,就是对各个时间的count计数累加。
如此,一个command的计数就实现了,其它累计计数也雷同。
直接父类代码
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream .window(numBuckets, 1) .flatMap(reduceWindowToSummary) ........省略代码........ }
依然像累计计数器同样对父级的桶流数据进行操做,这里用的是window(),第一个参数表示桶的个数,第二个参数表示一次移动的个数。这里numBuckets就是咱们的滑动窗口桶个数
第一排咱们能够认为是移动前的滑动窗口的数据,在执行完 flatMap里的function以后,滑动窗口向前移动一个桶位,那么 23 5 2 0
这个桶就被丢弃了,而后新进了最新的桶 45 6 2 0
。
那么每次滑动窗口内的数据是如何被处理呢?就是flatMap里的function作的,reduceWindowToSummary 最终被具体的子类stream实现,咱们就研究下HealthCountsStream
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() { @Override public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) { return healthCounts.plus(bucketEventCounts); } }; //HystrixCommandMetrics.HealthCounts#plus public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount); }
方法的实现也显而易见,统计了当前滑动窗口内成功数、失败数、线程拒绝数,超时数.....
该stream的职责就是探测服务的可用性,也是Hystrix熔断器是否生效依赖的数据源。
Hystrix的滑动窗口设计相对于其它可能稍微偏难理解些,其主要缘由仍是由于咱们对RxJava的了解不够,不过这不重要,只要耐心的多看几遍就没有什么问题。
本篇主要从指标数据上报到指标数据收集来逐步解开Hystrix指标搜集的神秘面纱。最后借用一大牛的图汇总下本篇的内容
参考文档
官方文档-How it works
官方文档-configuration
Hystrix 1.5 滑动窗口实现原理总结